1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 #include <unistd.h> 5 6 #include <rte_cryptodev.h> 7 #include <rte_malloc.h> 8 9 #include "rte_cryptodev_scheduler_operations.h" 10 #include "scheduler_pmd_private.h" 11 12 #define MC_SCHED_ENQ_RING_NAME_PREFIX "MCS_ENQR_" 13 #define MC_SCHED_DEQ_RING_NAME_PREFIX "MCS_DEQR_" 14 15 #define MC_SCHED_BUFFER_SIZE 32 16 17 #define CRYPTO_OP_STATUS_BIT_COMPLETE 0x80 18 19 /** multi-core scheduler context */ 20 struct mc_scheduler_ctx { 21 uint32_t num_workers; /**< Number of workers polling */ 22 uint32_t stop_signal; 23 24 struct rte_ring *sched_enq_ring[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKER_CORES]; 25 struct rte_ring *sched_deq_ring[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKER_CORES]; 26 }; 27 28 struct mc_scheduler_qp_ctx { 29 struct scheduler_slave slaves[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES]; 30 uint32_t nb_slaves; 31 32 uint32_t last_enq_worker_idx; 33 uint32_t last_deq_worker_idx; 34 35 struct mc_scheduler_ctx *mc_private_ctx; 36 }; 37 38 static uint16_t 39 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 40 { 41 struct mc_scheduler_qp_ctx *mc_qp_ctx = 42 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 43 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 44 uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx; 45 uint16_t i, processed_ops = 0; 46 47 if (unlikely(nb_ops == 0)) 48 return 0; 49 50 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 51 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 52 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring, 53 (void *)(&ops[processed_ops]), nb_ops, NULL); 54 55 nb_ops -= nb_queue_ops; 56 processed_ops += nb_queue_ops; 57 58 if (++worker_idx == mc_ctx->num_workers) 59 worker_idx = 0; 60 } 61 mc_qp_ctx->last_enq_worker_idx = worker_idx; 62 63 return processed_ops; 64 } 65 66 static uint16_t 67 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops, 68 uint16_t nb_ops) 69 { 70 struct rte_ring *order_ring = 71 ((struct scheduler_qp_ctx *)qp)->order_ring; 72 uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring, 73 nb_ops); 74 uint16_t nb_ops_enqd = schedule_enqueue(qp, ops, 75 nb_ops_to_enq); 76 77 scheduler_order_insert(order_ring, ops, nb_ops_enqd); 78 79 return nb_ops_enqd; 80 } 81 82 83 static uint16_t 84 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 85 { 86 struct mc_scheduler_qp_ctx *mc_qp_ctx = 87 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 88 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 89 uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx; 90 uint16_t i, processed_ops = 0; 91 92 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 93 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 94 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring, 95 (void *)(&ops[processed_ops]), nb_ops, NULL); 96 97 nb_ops -= nb_deq_ops; 98 processed_ops += nb_deq_ops; 99 if (++worker_idx == mc_ctx->num_workers) 100 worker_idx = 0; 101 } 102 103 mc_qp_ctx->last_deq_worker_idx = worker_idx; 104 105 return processed_ops; 106 107 } 108 109 static uint16_t 110 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops, 111 uint16_t nb_ops) 112 { 113 struct rte_ring *order_ring = ((struct scheduler_qp_ctx *)qp)->order_ring; 114 struct rte_crypto_op *op; 115 uint32_t nb_objs = rte_ring_count(order_ring); 116 uint32_t nb_ops_to_deq = 0; 117 uint32_t nb_ops_deqd = 0; 118 119 if (nb_objs > nb_ops) 120 nb_objs = nb_ops; 121 122 while (nb_ops_to_deq < nb_objs) { 123 SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq, op); 124 125 if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE)) 126 break; 127 128 op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE; 129 nb_ops_to_deq++; 130 } 131 132 if (nb_ops_to_deq) { 133 nb_ops_deqd = rte_ring_sc_dequeue_bulk(order_ring, 134 (void **)ops, nb_ops_to_deq, NULL); 135 } 136 137 return nb_ops_deqd; 138 } 139 140 static int 141 slave_attach(__rte_unused struct rte_cryptodev *dev, 142 __rte_unused uint8_t slave_id) 143 { 144 return 0; 145 } 146 147 static int 148 slave_detach(__rte_unused struct rte_cryptodev *dev, 149 __rte_unused uint8_t slave_id) 150 { 151 return 0; 152 } 153 154 static int 155 mc_scheduler_worker(struct rte_cryptodev *dev) 156 { 157 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 158 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 159 struct rte_ring *enq_ring; 160 struct rte_ring *deq_ring; 161 uint32_t core_id = rte_lcore_id(); 162 int i, worker_idx = -1; 163 struct scheduler_slave *slave; 164 struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE]; 165 struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE]; 166 uint16_t processed_ops; 167 uint16_t pending_enq_ops = 0; 168 uint16_t pending_enq_ops_idx = 0; 169 uint16_t pending_deq_ops = 0; 170 uint16_t pending_deq_ops_idx = 0; 171 uint16_t inflight_ops = 0; 172 const uint8_t reordering_enabled = sched_ctx->reordering_enabled; 173 174 for (i = 0; i < (int)sched_ctx->nb_wc; i++) { 175 if (sched_ctx->wc_pool[i] == core_id) { 176 worker_idx = i; 177 break; 178 } 179 } 180 if (worker_idx == -1) { 181 CS_LOG_ERR("worker on core %u:cannot find worker index!\n", core_id); 182 return -1; 183 } 184 185 slave = &sched_ctx->slaves[worker_idx]; 186 enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 187 deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 188 189 while (!mc_ctx->stop_signal) { 190 if (pending_enq_ops) { 191 processed_ops = 192 rte_cryptodev_enqueue_burst(slave->dev_id, 193 slave->qp_id, &enq_ops[pending_enq_ops_idx], 194 pending_enq_ops); 195 pending_enq_ops -= processed_ops; 196 pending_enq_ops_idx += processed_ops; 197 inflight_ops += processed_ops; 198 } else { 199 processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, 200 MC_SCHED_BUFFER_SIZE, NULL); 201 if (processed_ops) { 202 pending_enq_ops_idx = rte_cryptodev_enqueue_burst( 203 slave->dev_id, slave->qp_id, 204 enq_ops, processed_ops); 205 pending_enq_ops = processed_ops - pending_enq_ops_idx; 206 inflight_ops += pending_enq_ops_idx; 207 } 208 } 209 210 if (pending_deq_ops) { 211 processed_ops = rte_ring_enqueue_burst( 212 deq_ring, (void *)&deq_ops[pending_deq_ops_idx], 213 pending_deq_ops, NULL); 214 pending_deq_ops -= processed_ops; 215 pending_deq_ops_idx += processed_ops; 216 } else if (inflight_ops) { 217 processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id, 218 slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE); 219 if (processed_ops) { 220 inflight_ops -= processed_ops; 221 if (reordering_enabled) { 222 uint16_t j; 223 224 for (j = 0; j < processed_ops; j++) { 225 deq_ops[j]->status |= 226 CRYPTO_OP_STATUS_BIT_COMPLETE; 227 } 228 } else { 229 pending_deq_ops_idx = rte_ring_enqueue_burst( 230 deq_ring, (void *)deq_ops, processed_ops, 231 NULL); 232 pending_deq_ops = processed_ops - 233 pending_deq_ops_idx; 234 } 235 } 236 } 237 238 rte_pause(); 239 } 240 241 return 0; 242 } 243 244 static int 245 scheduler_start(struct rte_cryptodev *dev) 246 { 247 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 248 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 249 uint16_t i; 250 251 mc_ctx->stop_signal = 0; 252 253 for (i = 0; i < sched_ctx->nb_wc; i++) 254 rte_eal_remote_launch( 255 (lcore_function_t *)mc_scheduler_worker, dev, 256 sched_ctx->wc_pool[i]); 257 258 if (sched_ctx->reordering_enabled) { 259 dev->enqueue_burst = &schedule_enqueue_ordering; 260 dev->dequeue_burst = &schedule_dequeue_ordering; 261 } else { 262 dev->enqueue_burst = &schedule_enqueue; 263 dev->dequeue_burst = &schedule_dequeue; 264 } 265 266 for (i = 0; i < dev->data->nb_queue_pairs; i++) { 267 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i]; 268 struct mc_scheduler_qp_ctx *mc_qp_ctx = 269 qp_ctx->private_qp_ctx; 270 uint32_t j; 271 272 memset(mc_qp_ctx->slaves, 0, 273 RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES * 274 sizeof(struct scheduler_slave)); 275 for (j = 0; j < sched_ctx->nb_slaves; j++) { 276 mc_qp_ctx->slaves[j].dev_id = 277 sched_ctx->slaves[j].dev_id; 278 mc_qp_ctx->slaves[j].qp_id = i; 279 } 280 281 mc_qp_ctx->nb_slaves = sched_ctx->nb_slaves; 282 283 mc_qp_ctx->last_enq_worker_idx = 0; 284 mc_qp_ctx->last_deq_worker_idx = 0; 285 } 286 287 return 0; 288 } 289 290 static int 291 scheduler_stop(struct rte_cryptodev *dev) 292 { 293 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 294 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 295 uint16_t i; 296 297 mc_ctx->stop_signal = 1; 298 299 for (i = 0; i < sched_ctx->nb_wc; i++) 300 rte_eal_wait_lcore(sched_ctx->wc_pool[i]); 301 302 return 0; 303 } 304 305 static int 306 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id) 307 { 308 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id]; 309 struct mc_scheduler_qp_ctx *mc_qp_ctx; 310 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 311 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 312 313 mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0, 314 rte_socket_id()); 315 if (!mc_qp_ctx) { 316 CS_LOG_ERR("failed allocate memory for private queue pair"); 317 return -ENOMEM; 318 } 319 320 mc_qp_ctx->mc_private_ctx = mc_ctx; 321 qp_ctx->private_qp_ctx = (void *)mc_qp_ctx; 322 323 324 return 0; 325 } 326 327 static int 328 scheduler_create_private_ctx(struct rte_cryptodev *dev) 329 { 330 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 331 struct mc_scheduler_ctx *mc_ctx = NULL; 332 uint16_t i; 333 334 if (sched_ctx->private_ctx) { 335 rte_free(sched_ctx->private_ctx); 336 sched_ctx->private_ctx = NULL; 337 } 338 339 mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0, 340 rte_socket_id()); 341 if (!mc_ctx) { 342 CS_LOG_ERR("failed allocate memory"); 343 return -ENOMEM; 344 } 345 346 mc_ctx->num_workers = sched_ctx->nb_wc; 347 for (i = 0; i < sched_ctx->nb_wc; i++) { 348 char r_name[16]; 349 350 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX "%u", i); 351 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, PER_SLAVE_BUFF_SIZE, 352 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 353 if (!mc_ctx->sched_enq_ring[i]) { 354 CS_LOG_ERR("Cannot create ring for worker %u", i); 355 goto exit; 356 } 357 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX "%u", i); 358 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, PER_SLAVE_BUFF_SIZE, 359 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 360 if (!mc_ctx->sched_deq_ring[i]) { 361 CS_LOG_ERR("Cannot create ring for worker %u", i); 362 goto exit; 363 } 364 } 365 366 sched_ctx->private_ctx = (void *)mc_ctx; 367 368 return 0; 369 370 exit: 371 for (i = 0; i < sched_ctx->nb_wc; i++) { 372 rte_ring_free(mc_ctx->sched_enq_ring[i]); 373 rte_ring_free(mc_ctx->sched_deq_ring[i]); 374 } 375 rte_free(mc_ctx); 376 377 return -1; 378 } 379 380 struct rte_cryptodev_scheduler_ops scheduler_mc_ops = { 381 slave_attach, 382 slave_detach, 383 scheduler_start, 384 scheduler_stop, 385 scheduler_config_qp, 386 scheduler_create_private_ctx, 387 NULL, /* option_set */ 388 NULL /* option_get */ 389 }; 390 391 struct rte_cryptodev_scheduler mc_scheduler = { 392 .name = "multicore-scheduler", 393 .description = "scheduler which will run burst across multiple cpu cores", 394 .mode = CDEV_SCHED_MODE_MULTICORE, 395 .ops = &scheduler_mc_ops 396 }; 397 398 struct rte_cryptodev_scheduler *multicore_scheduler = &mc_scheduler; 399