xref: /dpdk/drivers/crypto/scheduler/scheduler_pkt_size_distr.c (revision 27595cd83053b2d39634a159d6709b3ce3cdf3b0)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <cryptodev_pmd.h>
6 #include <rte_malloc.h>
7 
8 #include "rte_cryptodev_scheduler_operations.h"
9 #include "scheduler_pmd_private.h"
10 
11 #define DEF_PKT_SIZE_THRESHOLD			(0xffffff80)
12 #define WORKER_IDX_SWITCH_MASK			(0x01)
13 #define PRIMARY_WORKER_IDX			0
14 #define SECONDARY_WORKER_IDX			1
15 #define NB_PKT_SIZE_WORKERS			2
16 
17 /** pkt size based scheduler context */
18 struct psd_scheduler_ctx {
19 	uint32_t threshold;
20 };
21 
22 /** pkt size based scheduler queue pair context */
23 struct __rte_cache_aligned psd_scheduler_qp_ctx {
24 	struct scheduler_worker primary_worker;
25 	struct scheduler_worker secondary_worker;
26 	uint32_t threshold;
27 	uint8_t deq_idx;
28 };
29 
30 /** scheduling operation variables' wrapping */
31 struct psd_schedule_op {
32 	uint8_t worker_idx;
33 	uint16_t pos;
34 };
35 
36 static uint16_t
schedule_enqueue(void * qp,struct rte_crypto_op ** ops,uint16_t nb_ops)37 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
38 {
39 	struct scheduler_qp_ctx *qp_ctx = qp;
40 	struct psd_scheduler_qp_ctx *psd_qp_ctx = qp_ctx->private_qp_ctx;
41 	struct rte_crypto_op *sched_ops[NB_PKT_SIZE_WORKERS][nb_ops];
42 	uint32_t in_flight_ops[NB_PKT_SIZE_WORKERS] = {
43 			psd_qp_ctx->primary_worker.nb_inflight_cops,
44 			psd_qp_ctx->secondary_worker.nb_inflight_cops
45 	};
46 	struct psd_schedule_op enq_ops[NB_PKT_SIZE_WORKERS] = {
47 		{PRIMARY_WORKER_IDX, 0}, {SECONDARY_WORKER_IDX, 0}
48 	};
49 	struct psd_schedule_op *p_enq_op;
50 	uint16_t i, processed_ops_pri = 0, processed_ops_sec = 0;
51 
52 	if (unlikely(nb_ops == 0))
53 		return 0;
54 
55 	for (i = 0; i < nb_ops && i < 4; i++) {
56 		rte_prefetch0(ops[i]->sym);
57 		rte_prefetch0((uint8_t *)ops[i]->sym->session +
58 			sizeof(struct rte_cryptodev_sym_session));
59 	}
60 
61 	for (i = 0; (i < (nb_ops - 8)) && (nb_ops > 8); i += 4) {
62 		uint8_t target[4];
63 		uint32_t job_len[4];
64 
65 		rte_prefetch0(ops[i + 4]->sym);
66 		rte_prefetch0((uint8_t *)ops[i + 4]->sym->session +
67 			sizeof(struct rte_cryptodev_sym_session));
68 		rte_prefetch0(ops[i + 5]->sym);
69 		rte_prefetch0((uint8_t *)ops[i + 5]->sym->session +
70 			sizeof(struct rte_cryptodev_sym_session));
71 		rte_prefetch0(ops[i + 6]->sym);
72 		rte_prefetch0((uint8_t *)ops[i + 6]->sym->session +
73 			sizeof(struct rte_cryptodev_sym_session));
74 		rte_prefetch0(ops[i + 7]->sym);
75 		rte_prefetch0((uint8_t *)ops[i + 7]->sym->session +
76 			sizeof(struct rte_cryptodev_sym_session));
77 
78 		job_len[0] = scheduler_get_job_len(ops[i]);
79 		/* decide the target op based on the job length */
80 		target[0] = !(job_len[0] & psd_qp_ctx->threshold);
81 		p_enq_op = &enq_ops[target[0]];
82 
83 		/* stop schedule cops before the queue is full, this shall
84 		 * prevent the failed enqueue
85 		 */
86 		if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] ==
87 				qp_ctx->max_nb_objs) {
88 			i = nb_ops;
89 			break;
90 		}
91 
92 		scheduler_set_single_worker_session(ops[i], target[0]);
93 		sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i];
94 		p_enq_op->pos++;
95 
96 		job_len[1] = scheduler_get_job_len(ops[i + 1]);
97 		target[1] = !(job_len[1] & psd_qp_ctx->threshold);
98 		p_enq_op = &enq_ops[target[1]];
99 
100 		if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] ==
101 				qp_ctx->max_nb_objs) {
102 			i = nb_ops;
103 			break;
104 		}
105 
106 		scheduler_set_single_worker_session(ops[i + 1], target[1]);
107 		sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i+1];
108 		p_enq_op->pos++;
109 
110 		job_len[2] = scheduler_get_job_len(ops[i + 2]);
111 		target[2] = !(job_len[2] & psd_qp_ctx->threshold);
112 		p_enq_op = &enq_ops[target[2]];
113 
114 		if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] ==
115 				qp_ctx->max_nb_objs) {
116 			i = nb_ops;
117 			break;
118 		}
119 
120 		scheduler_set_single_worker_session(ops[i + 2], target[2]);
121 		sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i+2];
122 		p_enq_op->pos++;
123 
124 		job_len[3] = scheduler_get_job_len(ops[i + 3]);
125 		target[3] = !(job_len[3] & psd_qp_ctx->threshold);
126 		p_enq_op = &enq_ops[target[3]];
127 
128 		if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] ==
129 				qp_ctx->max_nb_objs) {
130 			i = nb_ops;
131 			break;
132 		}
133 
134 		scheduler_set_single_worker_session(ops[i + 3], target[3]);
135 		sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i+3];
136 		p_enq_op->pos++;
137 	}
138 
139 	for (; i < nb_ops; i++) {
140 		uint32_t job_len;
141 		uint8_t target;
142 
143 		job_len = scheduler_get_job_len(ops[i]);
144 		target = !(job_len & psd_qp_ctx->threshold);
145 		p_enq_op = &enq_ops[target];
146 
147 		if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] ==
148 				qp_ctx->max_nb_objs) {
149 			i = nb_ops;
150 			break;
151 		}
152 
153 		scheduler_set_single_worker_session(ops[i], target);
154 		sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i];
155 		p_enq_op->pos++;
156 	}
157 
158 	processed_ops_pri = rte_cryptodev_enqueue_burst(
159 			psd_qp_ctx->primary_worker.dev_id,
160 			psd_qp_ctx->primary_worker.qp_id,
161 			sched_ops[PRIMARY_WORKER_IDX],
162 			enq_ops[PRIMARY_WORKER_IDX].pos);
163 	/* enqueue shall not fail as the worker queue is monitored */
164 	RTE_ASSERT(processed_ops_pri == enq_ops[PRIMARY_WORKER_IDX].pos);
165 
166 	psd_qp_ctx->primary_worker.nb_inflight_cops += processed_ops_pri;
167 
168 	processed_ops_sec = rte_cryptodev_enqueue_burst(
169 			psd_qp_ctx->secondary_worker.dev_id,
170 			psd_qp_ctx->secondary_worker.qp_id,
171 			sched_ops[SECONDARY_WORKER_IDX],
172 			enq_ops[SECONDARY_WORKER_IDX].pos);
173 	RTE_ASSERT(processed_ops_sec == enq_ops[SECONDARY_WORKER_IDX].pos);
174 
175 	psd_qp_ctx->secondary_worker.nb_inflight_cops += processed_ops_sec;
176 
177 	return processed_ops_pri + processed_ops_sec;
178 }
179 
180 static uint16_t
schedule_enqueue_ordering(void * qp,struct rte_crypto_op ** ops,uint16_t nb_ops)181 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops,
182 		uint16_t nb_ops)
183 {
184 	struct rte_ring *order_ring =
185 			((struct scheduler_qp_ctx *)qp)->order_ring;
186 	uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
187 			nb_ops);
188 	uint16_t nb_ops_enqd = schedule_enqueue(qp, ops,
189 			nb_ops_to_enq);
190 
191 	scheduler_order_insert(order_ring, ops, nb_ops_enqd);
192 
193 	return nb_ops_enqd;
194 }
195 
196 static uint16_t
schedule_dequeue(void * qp,struct rte_crypto_op ** ops,uint16_t nb_ops)197 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
198 {
199 	struct psd_scheduler_qp_ctx *qp_ctx =
200 			((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
201 	struct scheduler_worker *workers[NB_PKT_SIZE_WORKERS] = {
202 			&qp_ctx->primary_worker, &qp_ctx->secondary_worker};
203 	struct scheduler_worker *worker = workers[qp_ctx->deq_idx];
204 	uint16_t nb_deq_ops_pri = 0, nb_deq_ops_sec = 0;
205 
206 	if (worker->nb_inflight_cops) {
207 		nb_deq_ops_pri = rte_cryptodev_dequeue_burst(worker->dev_id,
208 			worker->qp_id, ops, nb_ops);
209 		scheduler_retrieve_sessions(ops, nb_deq_ops_pri);
210 		worker->nb_inflight_cops -= nb_deq_ops_pri;
211 	}
212 
213 	qp_ctx->deq_idx = (~qp_ctx->deq_idx) & WORKER_IDX_SWITCH_MASK;
214 
215 	if (nb_deq_ops_pri == nb_ops)
216 		return nb_deq_ops_pri;
217 
218 	worker = workers[qp_ctx->deq_idx];
219 
220 	if (worker->nb_inflight_cops) {
221 		nb_deq_ops_sec = rte_cryptodev_dequeue_burst(worker->dev_id,
222 				worker->qp_id, &ops[nb_deq_ops_pri],
223 				nb_ops - nb_deq_ops_pri);
224 		scheduler_retrieve_sessions(&ops[nb_deq_ops_pri], nb_deq_ops_sec);
225 		worker->nb_inflight_cops -= nb_deq_ops_sec;
226 
227 		if (!worker->nb_inflight_cops)
228 			qp_ctx->deq_idx = (~qp_ctx->deq_idx) &
229 					WORKER_IDX_SWITCH_MASK;
230 	}
231 
232 	return nb_deq_ops_pri + nb_deq_ops_sec;
233 }
234 
235 static uint16_t
schedule_dequeue_ordering(void * qp,struct rte_crypto_op ** ops,uint16_t nb_ops)236 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
237 		uint16_t nb_ops)
238 {
239 	struct rte_ring *order_ring =
240 			((struct scheduler_qp_ctx *)qp)->order_ring;
241 
242 	schedule_dequeue(qp, ops, nb_ops);
243 
244 	return scheduler_order_drain(order_ring, ops, nb_ops);
245 }
246 
247 static int
worker_attach(__rte_unused struct rte_cryptodev * dev,__rte_unused uint8_t worker_id)248 worker_attach(__rte_unused struct rte_cryptodev *dev,
249 		__rte_unused uint8_t worker_id)
250 {
251 	return 0;
252 }
253 
254 static int
worker_detach(__rte_unused struct rte_cryptodev * dev,__rte_unused uint8_t worker_id)255 worker_detach(__rte_unused struct rte_cryptodev *dev,
256 		__rte_unused uint8_t worker_id)
257 {
258 	return 0;
259 }
260 
261 static int
scheduler_start(struct rte_cryptodev * dev)262 scheduler_start(struct rte_cryptodev *dev)
263 {
264 	struct scheduler_ctx *sched_ctx = dev->data->dev_private;
265 	struct psd_scheduler_ctx *psd_ctx = sched_ctx->private_ctx;
266 	uint16_t i;
267 
268 	/* for packet size based scheduler, nb_workers have to >= 2 */
269 	if (sched_ctx->nb_workers < NB_PKT_SIZE_WORKERS) {
270 		CR_SCHED_LOG(ERR, "not enough workers to start");
271 		return -1;
272 	}
273 
274 	for (i = 0; i < dev->data->nb_queue_pairs; i++) {
275 		struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
276 		struct psd_scheduler_qp_ctx *ps_qp_ctx =
277 				qp_ctx->private_qp_ctx;
278 
279 		ps_qp_ctx->primary_worker.dev_id =
280 				sched_ctx->workers[PRIMARY_WORKER_IDX].dev_id;
281 		ps_qp_ctx->primary_worker.qp_id = i;
282 		ps_qp_ctx->primary_worker.nb_inflight_cops = 0;
283 
284 		ps_qp_ctx->secondary_worker.dev_id =
285 				sched_ctx->workers[SECONDARY_WORKER_IDX].dev_id;
286 		ps_qp_ctx->secondary_worker.qp_id = i;
287 		ps_qp_ctx->secondary_worker.nb_inflight_cops = 0;
288 
289 		ps_qp_ctx->threshold = psd_ctx->threshold;
290 	}
291 
292 	if (sched_ctx->reordering_enabled) {
293 		dev->enqueue_burst = &schedule_enqueue_ordering;
294 		dev->dequeue_burst = &schedule_dequeue_ordering;
295 	} else {
296 		dev->enqueue_burst = &schedule_enqueue;
297 		dev->dequeue_burst = &schedule_dequeue;
298 	}
299 
300 	return 0;
301 }
302 
303 static int
scheduler_stop(struct rte_cryptodev * dev)304 scheduler_stop(struct rte_cryptodev *dev)
305 {
306 	uint16_t i;
307 
308 	for (i = 0; i < dev->data->nb_queue_pairs; i++) {
309 		struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
310 		struct psd_scheduler_qp_ctx *ps_qp_ctx = qp_ctx->private_qp_ctx;
311 
312 		if (ps_qp_ctx->primary_worker.nb_inflight_cops +
313 				ps_qp_ctx->secondary_worker.nb_inflight_cops) {
314 			CR_SCHED_LOG(ERR, "Some crypto ops left in worker queue");
315 			return -1;
316 		}
317 	}
318 
319 	return 0;
320 }
321 
322 static int
scheduler_config_qp(struct rte_cryptodev * dev,uint16_t qp_id)323 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id)
324 {
325 	struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];
326 	struct psd_scheduler_qp_ctx *ps_qp_ctx;
327 
328 	ps_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*ps_qp_ctx), 0,
329 			rte_socket_id());
330 	if (!ps_qp_ctx) {
331 		CR_SCHED_LOG(ERR, "failed allocate memory for private queue pair");
332 		return -ENOMEM;
333 	}
334 
335 	qp_ctx->private_qp_ctx = (void *)ps_qp_ctx;
336 
337 	return 0;
338 }
339 
340 static int
scheduler_create_private_ctx(struct rte_cryptodev * dev)341 scheduler_create_private_ctx(struct rte_cryptodev *dev)
342 {
343 	struct scheduler_ctx *sched_ctx = dev->data->dev_private;
344 	struct psd_scheduler_ctx *psd_ctx;
345 
346 	if (sched_ctx->private_ctx) {
347 		rte_free(sched_ctx->private_ctx);
348 		sched_ctx->private_ctx = NULL;
349 	}
350 
351 	psd_ctx = rte_zmalloc_socket(NULL, sizeof(struct psd_scheduler_ctx), 0,
352 			rte_socket_id());
353 	if (!psd_ctx) {
354 		CR_SCHED_LOG(ERR, "failed allocate memory");
355 		return -ENOMEM;
356 	}
357 
358 	psd_ctx->threshold = DEF_PKT_SIZE_THRESHOLD;
359 
360 	sched_ctx->private_ctx = (void *)psd_ctx;
361 
362 	return 0;
363 }
364 static int
scheduler_option_set(struct rte_cryptodev * dev,uint32_t option_type,void * option)365 scheduler_option_set(struct rte_cryptodev *dev, uint32_t option_type,
366 		void *option)
367 {
368 	struct psd_scheduler_ctx *psd_ctx = ((struct scheduler_ctx *)
369 			dev->data->dev_private)->private_ctx;
370 	uint32_t threshold;
371 
372 	if ((enum rte_cryptodev_schedule_option_type)option_type !=
373 			CDEV_SCHED_OPTION_THRESHOLD) {
374 		CR_SCHED_LOG(ERR, "Option not supported");
375 		return -EINVAL;
376 	}
377 
378 	threshold = ((struct rte_cryptodev_scheduler_threshold_option *)
379 			option)->threshold;
380 	if (!rte_is_power_of_2(threshold)) {
381 		CR_SCHED_LOG(ERR, "Threshold is not power of 2");
382 		return -EINVAL;
383 	}
384 
385 	psd_ctx->threshold = ~(threshold - 1);
386 
387 	return 0;
388 }
389 
390 static int
scheduler_option_get(struct rte_cryptodev * dev,uint32_t option_type,void * option)391 scheduler_option_get(struct rte_cryptodev *dev, uint32_t option_type,
392 		void *option)
393 {
394 	struct psd_scheduler_ctx *psd_ctx = ((struct scheduler_ctx *)
395 			dev->data->dev_private)->private_ctx;
396 	struct rte_cryptodev_scheduler_threshold_option *threshold_option;
397 
398 	if ((enum rte_cryptodev_schedule_option_type)option_type !=
399 			CDEV_SCHED_OPTION_THRESHOLD) {
400 		CR_SCHED_LOG(ERR, "Option not supported");
401 		return -EINVAL;
402 	}
403 
404 	threshold_option = option;
405 	threshold_option->threshold = (~psd_ctx->threshold) + 1;
406 
407 	return 0;
408 }
409 
410 static struct rte_cryptodev_scheduler_ops scheduler_ps_ops = {
411 	worker_attach,
412 	worker_detach,
413 	scheduler_start,
414 	scheduler_stop,
415 	scheduler_config_qp,
416 	scheduler_create_private_ctx,
417 	scheduler_option_set,
418 	scheduler_option_get
419 };
420 
421 static struct rte_cryptodev_scheduler psd_scheduler = {
422 		.name = "packet-size-based-scheduler",
423 		.description = "scheduler which will distribute crypto op "
424 				"burst based on the packet size",
425 		.mode = CDEV_SCHED_MODE_PKT_SIZE_DISTR,
426 		.ops = &scheduler_ps_ops
427 };
428 
429 struct rte_cryptodev_scheduler *crypto_scheduler_pkt_size_based_distr = &psd_scheduler;
430