1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 #include <unistd.h> 5 6 #include <rte_cryptodev.h> 7 #include <rte_malloc.h> 8 9 #include "rte_cryptodev_scheduler_operations.h" 10 #include "scheduler_pmd_private.h" 11 12 #define MC_SCHED_ENQ_RING_NAME_PREFIX "MCS_ENQR_" 13 #define MC_SCHED_DEQ_RING_NAME_PREFIX "MCS_DEQR_" 14 15 #define MC_SCHED_BUFFER_SIZE 32 16 17 #define CRYPTO_OP_STATUS_BIT_COMPLETE 0x80 18 19 /** multi-core scheduler context */ 20 struct mc_scheduler_ctx { 21 uint32_t num_workers; /**< Number of workers polling */ 22 uint32_t stop_signal; 23 24 struct rte_ring *sched_enq_ring[RTE_MAX_LCORE]; 25 struct rte_ring *sched_deq_ring[RTE_MAX_LCORE]; 26 }; 27 28 struct mc_scheduler_qp_ctx { 29 struct scheduler_slave slaves[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES]; 30 uint32_t nb_slaves; 31 32 uint32_t last_enq_worker_idx; 33 uint32_t last_deq_worker_idx; 34 35 struct mc_scheduler_ctx *mc_private_ctx; 36 }; 37 38 static uint16_t 39 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 40 { 41 struct mc_scheduler_qp_ctx *mc_qp_ctx = 42 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 43 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 44 uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx; 45 uint16_t i, processed_ops = 0; 46 47 if (unlikely(nb_ops == 0)) 48 return 0; 49 50 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 51 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 52 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring, 53 (void *)(&ops[processed_ops]), nb_ops, NULL); 54 55 nb_ops -= nb_queue_ops; 56 processed_ops += nb_queue_ops; 57 58 if (++worker_idx == mc_ctx->num_workers) 59 worker_idx = 0; 60 } 61 mc_qp_ctx->last_enq_worker_idx = worker_idx; 62 63 return processed_ops; 64 } 65 66 static uint16_t 67 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops, 68 uint16_t nb_ops) 69 { 70 struct rte_ring *order_ring = 71 ((struct scheduler_qp_ctx *)qp)->order_ring; 72 uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring, 73 nb_ops); 74 uint16_t nb_ops_enqd = schedule_enqueue(qp, ops, 75 nb_ops_to_enq); 76 77 scheduler_order_insert(order_ring, ops, nb_ops_enqd); 78 79 return nb_ops_enqd; 80 } 81 82 83 static uint16_t 84 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 85 { 86 struct mc_scheduler_qp_ctx *mc_qp_ctx = 87 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 88 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 89 uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx; 90 uint16_t i, processed_ops = 0; 91 92 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 93 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 94 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring, 95 (void *)(&ops[processed_ops]), nb_ops, NULL); 96 97 nb_ops -= nb_deq_ops; 98 processed_ops += nb_deq_ops; 99 if (++worker_idx == mc_ctx->num_workers) 100 worker_idx = 0; 101 } 102 103 mc_qp_ctx->last_deq_worker_idx = worker_idx; 104 105 return processed_ops; 106 107 } 108 109 static uint16_t 110 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops, 111 uint16_t nb_ops) 112 { 113 struct rte_ring *order_ring = 114 ((struct scheduler_qp_ctx *)qp)->order_ring; 115 struct rte_crypto_op *op; 116 uint32_t nb_objs, nb_ops_to_deq; 117 118 nb_objs = rte_ring_dequeue_burst_start(order_ring, (void **)ops, 119 nb_ops, NULL); 120 if (nb_objs == 0) 121 return 0; 122 123 for (nb_ops_to_deq = 0; nb_ops_to_deq != nb_objs; nb_ops_to_deq++) { 124 op = ops[nb_ops_to_deq]; 125 if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE)) 126 break; 127 op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE; 128 } 129 130 rte_ring_dequeue_finish(order_ring, nb_ops_to_deq); 131 return nb_ops_to_deq; 132 } 133 134 static int 135 slave_attach(__rte_unused struct rte_cryptodev *dev, 136 __rte_unused uint8_t slave_id) 137 { 138 return 0; 139 } 140 141 static int 142 slave_detach(__rte_unused struct rte_cryptodev *dev, 143 __rte_unused uint8_t slave_id) 144 { 145 return 0; 146 } 147 148 static int 149 mc_scheduler_worker(struct rte_cryptodev *dev) 150 { 151 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 152 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 153 struct rte_ring *enq_ring; 154 struct rte_ring *deq_ring; 155 uint32_t core_id = rte_lcore_id(); 156 int i, worker_idx = -1; 157 struct scheduler_slave *slave; 158 struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE]; 159 struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE]; 160 uint16_t processed_ops; 161 uint16_t pending_enq_ops = 0; 162 uint16_t pending_enq_ops_idx = 0; 163 uint16_t pending_deq_ops = 0; 164 uint16_t pending_deq_ops_idx = 0; 165 uint16_t inflight_ops = 0; 166 const uint8_t reordering_enabled = sched_ctx->reordering_enabled; 167 168 for (i = 0; i < (int)sched_ctx->nb_wc; i++) { 169 if (sched_ctx->wc_pool[i] == core_id) { 170 worker_idx = i; 171 break; 172 } 173 } 174 if (worker_idx == -1) { 175 CR_SCHED_LOG(ERR, "worker on core %u:cannot find worker index!", 176 core_id); 177 return -1; 178 } 179 180 slave = &sched_ctx->slaves[worker_idx]; 181 enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 182 deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 183 184 while (!mc_ctx->stop_signal) { 185 if (pending_enq_ops) { 186 processed_ops = 187 rte_cryptodev_enqueue_burst(slave->dev_id, 188 slave->qp_id, &enq_ops[pending_enq_ops_idx], 189 pending_enq_ops); 190 pending_enq_ops -= processed_ops; 191 pending_enq_ops_idx += processed_ops; 192 inflight_ops += processed_ops; 193 } else { 194 processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, 195 MC_SCHED_BUFFER_SIZE, NULL); 196 if (processed_ops) { 197 pending_enq_ops_idx = rte_cryptodev_enqueue_burst( 198 slave->dev_id, slave->qp_id, 199 enq_ops, processed_ops); 200 pending_enq_ops = processed_ops - pending_enq_ops_idx; 201 inflight_ops += pending_enq_ops_idx; 202 } 203 } 204 205 if (pending_deq_ops) { 206 processed_ops = rte_ring_enqueue_burst( 207 deq_ring, (void *)&deq_ops[pending_deq_ops_idx], 208 pending_deq_ops, NULL); 209 pending_deq_ops -= processed_ops; 210 pending_deq_ops_idx += processed_ops; 211 } else if (inflight_ops) { 212 processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id, 213 slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE); 214 if (processed_ops) { 215 inflight_ops -= processed_ops; 216 if (reordering_enabled) { 217 uint16_t j; 218 219 for (j = 0; j < processed_ops; j++) { 220 deq_ops[j]->status |= 221 CRYPTO_OP_STATUS_BIT_COMPLETE; 222 } 223 } else { 224 pending_deq_ops_idx = rte_ring_enqueue_burst( 225 deq_ring, (void *)deq_ops, processed_ops, 226 NULL); 227 pending_deq_ops = processed_ops - 228 pending_deq_ops_idx; 229 } 230 } 231 } 232 233 rte_pause(); 234 } 235 236 return 0; 237 } 238 239 static int 240 scheduler_start(struct rte_cryptodev *dev) 241 { 242 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 243 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 244 uint16_t i; 245 246 mc_ctx->stop_signal = 0; 247 248 for (i = 0; i < sched_ctx->nb_wc; i++) 249 rte_eal_remote_launch( 250 (lcore_function_t *)mc_scheduler_worker, dev, 251 sched_ctx->wc_pool[i]); 252 253 if (sched_ctx->reordering_enabled) { 254 dev->enqueue_burst = &schedule_enqueue_ordering; 255 dev->dequeue_burst = &schedule_dequeue_ordering; 256 } else { 257 dev->enqueue_burst = &schedule_enqueue; 258 dev->dequeue_burst = &schedule_dequeue; 259 } 260 261 for (i = 0; i < dev->data->nb_queue_pairs; i++) { 262 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i]; 263 struct mc_scheduler_qp_ctx *mc_qp_ctx = 264 qp_ctx->private_qp_ctx; 265 uint32_t j; 266 267 memset(mc_qp_ctx->slaves, 0, 268 RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES * 269 sizeof(struct scheduler_slave)); 270 for (j = 0; j < sched_ctx->nb_slaves; j++) { 271 mc_qp_ctx->slaves[j].dev_id = 272 sched_ctx->slaves[j].dev_id; 273 mc_qp_ctx->slaves[j].qp_id = i; 274 } 275 276 mc_qp_ctx->nb_slaves = sched_ctx->nb_slaves; 277 278 mc_qp_ctx->last_enq_worker_idx = 0; 279 mc_qp_ctx->last_deq_worker_idx = 0; 280 } 281 282 return 0; 283 } 284 285 static int 286 scheduler_stop(struct rte_cryptodev *dev) 287 { 288 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 289 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 290 uint16_t i; 291 292 mc_ctx->stop_signal = 1; 293 294 for (i = 0; i < sched_ctx->nb_wc; i++) 295 rte_eal_wait_lcore(sched_ctx->wc_pool[i]); 296 297 return 0; 298 } 299 300 static int 301 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id) 302 { 303 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id]; 304 struct mc_scheduler_qp_ctx *mc_qp_ctx; 305 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 306 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 307 308 mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0, 309 rte_socket_id()); 310 if (!mc_qp_ctx) { 311 CR_SCHED_LOG(ERR, "failed allocate memory for private queue pair"); 312 return -ENOMEM; 313 } 314 315 mc_qp_ctx->mc_private_ctx = mc_ctx; 316 qp_ctx->private_qp_ctx = (void *)mc_qp_ctx; 317 318 319 return 0; 320 } 321 322 static int 323 scheduler_create_private_ctx(struct rte_cryptodev *dev) 324 { 325 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 326 struct mc_scheduler_ctx *mc_ctx = NULL; 327 uint16_t i; 328 329 if (sched_ctx->private_ctx) { 330 rte_free(sched_ctx->private_ctx); 331 sched_ctx->private_ctx = NULL; 332 } 333 334 mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0, 335 rte_socket_id()); 336 if (!mc_ctx) { 337 CR_SCHED_LOG(ERR, "failed allocate memory"); 338 return -ENOMEM; 339 } 340 341 mc_ctx->num_workers = sched_ctx->nb_wc; 342 for (i = 0; i < sched_ctx->nb_wc; i++) { 343 char r_name[16]; 344 345 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX 346 "%u_%u", dev->data->dev_id, i); 347 mc_ctx->sched_enq_ring[i] = rte_ring_lookup(r_name); 348 if (!mc_ctx->sched_enq_ring[i]) { 349 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, 350 PER_SLAVE_BUFF_SIZE, 351 rte_socket_id(), 352 RING_F_SC_DEQ | RING_F_SP_ENQ); 353 if (!mc_ctx->sched_enq_ring[i]) { 354 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 355 i); 356 goto exit; 357 } 358 } 359 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX 360 "%u_%u", dev->data->dev_id, i); 361 mc_ctx->sched_deq_ring[i] = rte_ring_lookup(r_name); 362 if (!mc_ctx->sched_deq_ring[i]) { 363 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, 364 PER_SLAVE_BUFF_SIZE, 365 rte_socket_id(), 366 RING_F_SC_DEQ | RING_F_SP_ENQ); 367 if (!mc_ctx->sched_deq_ring[i]) { 368 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 369 i); 370 goto exit; 371 } 372 } 373 } 374 375 sched_ctx->private_ctx = (void *)mc_ctx; 376 377 return 0; 378 379 exit: 380 for (i = 0; i < sched_ctx->nb_wc; i++) { 381 rte_ring_free(mc_ctx->sched_enq_ring[i]); 382 rte_ring_free(mc_ctx->sched_deq_ring[i]); 383 } 384 rte_free(mc_ctx); 385 386 return -1; 387 } 388 389 static struct rte_cryptodev_scheduler_ops scheduler_mc_ops = { 390 slave_attach, 391 slave_detach, 392 scheduler_start, 393 scheduler_stop, 394 scheduler_config_qp, 395 scheduler_create_private_ctx, 396 NULL, /* option_set */ 397 NULL /* option_get */ 398 }; 399 400 static struct rte_cryptodev_scheduler mc_scheduler = { 401 .name = "multicore-scheduler", 402 .description = "scheduler which will run burst across multiple cpu cores", 403 .mode = CDEV_SCHED_MODE_MULTICORE, 404 .ops = &scheduler_mc_ops 405 }; 406 407 struct rte_cryptodev_scheduler *crypto_scheduler_multicore = &mc_scheduler; 408