1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 #include <unistd.h> 5 6 #include <rte_cryptodev.h> 7 #include <rte_malloc.h> 8 9 #include "rte_cryptodev_scheduler_operations.h" 10 #include "scheduler_pmd_private.h" 11 12 #define MC_SCHED_ENQ_RING_NAME_PREFIX "MCS_ENQR_" 13 #define MC_SCHED_DEQ_RING_NAME_PREFIX "MCS_DEQR_" 14 15 #define MC_SCHED_BUFFER_SIZE 32 16 17 #define CRYPTO_OP_STATUS_BIT_COMPLETE 0x80 18 19 /** multi-core scheduler context */ 20 struct mc_scheduler_ctx { 21 uint32_t num_workers; /**< Number of workers polling */ 22 uint32_t stop_signal; 23 24 struct rte_ring *sched_enq_ring[RTE_MAX_LCORE]; 25 struct rte_ring *sched_deq_ring[RTE_MAX_LCORE]; 26 }; 27 28 struct mc_scheduler_qp_ctx { 29 struct scheduler_worker workers[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS]; 30 uint32_t nb_workers; 31 32 uint32_t last_enq_worker_idx; 33 uint32_t last_deq_worker_idx; 34 35 struct mc_scheduler_ctx *mc_private_ctx; 36 }; 37 38 static uint16_t 39 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 40 { 41 struct mc_scheduler_qp_ctx *mc_qp_ctx = 42 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 43 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 44 uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx; 45 uint16_t i, processed_ops = 0; 46 47 if (unlikely(nb_ops == 0)) 48 return 0; 49 50 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 51 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 52 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring, 53 (void *)(&ops[processed_ops]), nb_ops, NULL); 54 55 nb_ops -= nb_queue_ops; 56 processed_ops += nb_queue_ops; 57 58 if (++worker_idx == mc_ctx->num_workers) 59 worker_idx = 0; 60 } 61 mc_qp_ctx->last_enq_worker_idx = worker_idx; 62 63 return processed_ops; 64 } 65 66 static uint16_t 67 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops, 68 uint16_t nb_ops) 69 { 70 struct rte_ring *order_ring = 71 ((struct scheduler_qp_ctx *)qp)->order_ring; 72 uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring, 73 nb_ops); 74 uint16_t nb_ops_enqd = schedule_enqueue(qp, ops, 75 nb_ops_to_enq); 76 77 scheduler_order_insert(order_ring, ops, nb_ops_enqd); 78 79 return nb_ops_enqd; 80 } 81 82 83 static uint16_t 84 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 85 { 86 struct mc_scheduler_qp_ctx *mc_qp_ctx = 87 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 88 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 89 uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx; 90 uint16_t i, processed_ops = 0; 91 92 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 93 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 94 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring, 95 (void *)(&ops[processed_ops]), nb_ops, NULL); 96 97 nb_ops -= nb_deq_ops; 98 processed_ops += nb_deq_ops; 99 if (++worker_idx == mc_ctx->num_workers) 100 worker_idx = 0; 101 } 102 103 mc_qp_ctx->last_deq_worker_idx = worker_idx; 104 105 return processed_ops; 106 107 } 108 109 static uint16_t 110 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops, 111 uint16_t nb_ops) 112 { 113 struct rte_ring *order_ring = 114 ((struct scheduler_qp_ctx *)qp)->order_ring; 115 struct rte_crypto_op *op; 116 uint32_t nb_objs, nb_ops_to_deq; 117 118 nb_objs = rte_ring_dequeue_burst_start(order_ring, (void **)ops, 119 nb_ops, NULL); 120 if (nb_objs == 0) 121 return 0; 122 123 for (nb_ops_to_deq = 0; nb_ops_to_deq != nb_objs; nb_ops_to_deq++) { 124 op = ops[nb_ops_to_deq]; 125 if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE)) 126 break; 127 op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE; 128 } 129 130 rte_ring_dequeue_finish(order_ring, nb_ops_to_deq); 131 return nb_ops_to_deq; 132 } 133 134 static int 135 worker_attach(__rte_unused struct rte_cryptodev *dev, 136 __rte_unused uint8_t worker_id) 137 { 138 return 0; 139 } 140 141 static int 142 worker_detach(__rte_unused struct rte_cryptodev *dev, 143 __rte_unused uint8_t worker_id) 144 { 145 return 0; 146 } 147 148 static int 149 mc_scheduler_worker(struct rte_cryptodev *dev) 150 { 151 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 152 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 153 struct rte_ring *enq_ring; 154 struct rte_ring *deq_ring; 155 uint32_t core_id = rte_lcore_id(); 156 int i, worker_idx = -1; 157 struct scheduler_worker *worker; 158 struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE]; 159 struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE]; 160 uint16_t processed_ops; 161 uint16_t pending_enq_ops = 0; 162 uint16_t pending_enq_ops_idx = 0; 163 uint16_t pending_deq_ops = 0; 164 uint16_t pending_deq_ops_idx = 0; 165 uint16_t inflight_ops = 0; 166 const uint8_t reordering_enabled = sched_ctx->reordering_enabled; 167 168 for (i = 0; i < (int)sched_ctx->nb_wc; i++) { 169 if (sched_ctx->wc_pool[i] == core_id) { 170 worker_idx = i; 171 break; 172 } 173 } 174 if (worker_idx == -1) { 175 CR_SCHED_LOG(ERR, "worker on core %u:cannot find worker index!", 176 core_id); 177 return -1; 178 } 179 180 worker = &sched_ctx->workers[worker_idx]; 181 enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 182 deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 183 184 while (!mc_ctx->stop_signal) { 185 if (pending_enq_ops) { 186 processed_ops = 187 rte_cryptodev_enqueue_burst(worker->dev_id, 188 worker->qp_id, 189 &enq_ops[pending_enq_ops_idx], 190 pending_enq_ops); 191 pending_enq_ops -= processed_ops; 192 pending_enq_ops_idx += processed_ops; 193 inflight_ops += processed_ops; 194 } else { 195 processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, 196 MC_SCHED_BUFFER_SIZE, NULL); 197 if (processed_ops) { 198 pending_enq_ops_idx = rte_cryptodev_enqueue_burst( 199 worker->dev_id, worker->qp_id, 200 enq_ops, processed_ops); 201 pending_enq_ops = processed_ops - pending_enq_ops_idx; 202 inflight_ops += pending_enq_ops_idx; 203 } 204 } 205 206 if (pending_deq_ops) { 207 processed_ops = rte_ring_enqueue_burst( 208 deq_ring, (void *)&deq_ops[pending_deq_ops_idx], 209 pending_deq_ops, NULL); 210 pending_deq_ops -= processed_ops; 211 pending_deq_ops_idx += processed_ops; 212 } else if (inflight_ops) { 213 processed_ops = rte_cryptodev_dequeue_burst( 214 worker->dev_id, worker->qp_id, deq_ops, 215 MC_SCHED_BUFFER_SIZE); 216 if (processed_ops) { 217 inflight_ops -= processed_ops; 218 if (reordering_enabled) { 219 uint16_t j; 220 221 for (j = 0; j < processed_ops; j++) { 222 deq_ops[j]->status |= 223 CRYPTO_OP_STATUS_BIT_COMPLETE; 224 } 225 } else { 226 pending_deq_ops_idx = rte_ring_enqueue_burst( 227 deq_ring, (void *)deq_ops, processed_ops, 228 NULL); 229 pending_deq_ops = processed_ops - 230 pending_deq_ops_idx; 231 } 232 } 233 } 234 235 rte_pause(); 236 } 237 238 return 0; 239 } 240 241 static int 242 scheduler_start(struct rte_cryptodev *dev) 243 { 244 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 245 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 246 uint16_t i; 247 248 mc_ctx->stop_signal = 0; 249 250 for (i = 0; i < sched_ctx->nb_wc; i++) 251 rte_eal_remote_launch( 252 (lcore_function_t *)mc_scheduler_worker, dev, 253 sched_ctx->wc_pool[i]); 254 255 if (sched_ctx->reordering_enabled) { 256 dev->enqueue_burst = &schedule_enqueue_ordering; 257 dev->dequeue_burst = &schedule_dequeue_ordering; 258 } else { 259 dev->enqueue_burst = &schedule_enqueue; 260 dev->dequeue_burst = &schedule_dequeue; 261 } 262 263 for (i = 0; i < dev->data->nb_queue_pairs; i++) { 264 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i]; 265 struct mc_scheduler_qp_ctx *mc_qp_ctx = 266 qp_ctx->private_qp_ctx; 267 uint32_t j; 268 269 memset(mc_qp_ctx->workers, 0, 270 RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS * 271 sizeof(struct scheduler_worker)); 272 for (j = 0; j < sched_ctx->nb_workers; j++) { 273 mc_qp_ctx->workers[j].dev_id = 274 sched_ctx->workers[j].dev_id; 275 mc_qp_ctx->workers[j].qp_id = i; 276 } 277 278 mc_qp_ctx->nb_workers = sched_ctx->nb_workers; 279 280 mc_qp_ctx->last_enq_worker_idx = 0; 281 mc_qp_ctx->last_deq_worker_idx = 0; 282 } 283 284 return 0; 285 } 286 287 static int 288 scheduler_stop(struct rte_cryptodev *dev) 289 { 290 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 291 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 292 uint16_t i; 293 294 mc_ctx->stop_signal = 1; 295 296 for (i = 0; i < sched_ctx->nb_wc; i++) 297 rte_eal_wait_lcore(sched_ctx->wc_pool[i]); 298 299 return 0; 300 } 301 302 static int 303 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id) 304 { 305 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id]; 306 struct mc_scheduler_qp_ctx *mc_qp_ctx; 307 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 308 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 309 310 mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0, 311 rte_socket_id()); 312 if (!mc_qp_ctx) { 313 CR_SCHED_LOG(ERR, "failed allocate memory for private queue pair"); 314 return -ENOMEM; 315 } 316 317 mc_qp_ctx->mc_private_ctx = mc_ctx; 318 qp_ctx->private_qp_ctx = (void *)mc_qp_ctx; 319 320 321 return 0; 322 } 323 324 static int 325 scheduler_create_private_ctx(struct rte_cryptodev *dev) 326 { 327 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 328 struct mc_scheduler_ctx *mc_ctx = NULL; 329 uint16_t i; 330 331 if (sched_ctx->private_ctx) { 332 rte_free(sched_ctx->private_ctx); 333 sched_ctx->private_ctx = NULL; 334 } 335 336 mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0, 337 rte_socket_id()); 338 if (!mc_ctx) { 339 CR_SCHED_LOG(ERR, "failed allocate memory"); 340 return -ENOMEM; 341 } 342 343 mc_ctx->num_workers = sched_ctx->nb_wc; 344 for (i = 0; i < sched_ctx->nb_wc; i++) { 345 char r_name[16]; 346 347 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX 348 "%u_%u", dev->data->dev_id, i); 349 mc_ctx->sched_enq_ring[i] = rte_ring_lookup(r_name); 350 if (!mc_ctx->sched_enq_ring[i]) { 351 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, 352 PER_WORKER_BUFF_SIZE, 353 rte_socket_id(), 354 RING_F_SC_DEQ | RING_F_SP_ENQ); 355 if (!mc_ctx->sched_enq_ring[i]) { 356 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 357 i); 358 goto exit; 359 } 360 } 361 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX 362 "%u_%u", dev->data->dev_id, i); 363 mc_ctx->sched_deq_ring[i] = rte_ring_lookup(r_name); 364 if (!mc_ctx->sched_deq_ring[i]) { 365 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, 366 PER_WORKER_BUFF_SIZE, 367 rte_socket_id(), 368 RING_F_SC_DEQ | RING_F_SP_ENQ); 369 if (!mc_ctx->sched_deq_ring[i]) { 370 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 371 i); 372 goto exit; 373 } 374 } 375 } 376 377 sched_ctx->private_ctx = (void *)mc_ctx; 378 379 return 0; 380 381 exit: 382 for (i = 0; i < sched_ctx->nb_wc; i++) { 383 rte_ring_free(mc_ctx->sched_enq_ring[i]); 384 rte_ring_free(mc_ctx->sched_deq_ring[i]); 385 } 386 rte_free(mc_ctx); 387 388 return -1; 389 } 390 391 static struct rte_cryptodev_scheduler_ops scheduler_mc_ops = { 392 worker_attach, 393 worker_detach, 394 scheduler_start, 395 scheduler_stop, 396 scheduler_config_qp, 397 scheduler_create_private_ctx, 398 NULL, /* option_set */ 399 NULL /* option_get */ 400 }; 401 402 static struct rte_cryptodev_scheduler mc_scheduler = { 403 .name = "multicore-scheduler", 404 .description = "scheduler which will run burst across multiple cpu cores", 405 .mode = CDEV_SCHED_MODE_MULTICORE, 406 .ops = &scheduler_mc_ops 407 }; 408 409 struct rte_cryptodev_scheduler *crypto_scheduler_multicore = &mc_scheduler; 410