xref: /spdk/module/accel/dpdk_cryptodev/accel_dpdk_cryptodev.c (revision 1db33a8f747262c3b3643451fa1358d862c49b84)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   Copyright (c) 2022, 2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "accel_dpdk_cryptodev.h"
8 
9 #include "spdk/accel.h"
10 #include "spdk/accel_module.h"
11 #include "spdk/env.h"
12 #include "spdk/likely.h"
13 #include "spdk/thread.h"
14 #include "spdk/util.h"
15 #include "spdk/log.h"
16 #include "spdk/json.h"
17 #include "spdk_internal/sgl.h"
18 
19 #include <rte_bus_vdev.h>
20 #include <rte_crypto.h>
21 #include <rte_cryptodev.h>
22 #include <rte_mbuf_dyn.h>
23 #include <rte_version.h>
24 
25 /* The VF spread is the number of queue pairs between virtual functions, we use this to
26  * load balance the QAT device.
27  */
28 #define ACCEL_DPDK_CRYPTODEV_QAT_VF_SPREAD		32
29 
30 /* This controls how many ops will be dequeued from the crypto driver in one run
31  * of the poller. It is mainly a performance knob as it effectively determines how
32  * much work the poller has to do.  However even that can vary between crypto drivers
33  * as the ACCEL_DPDK_CRYPTODEV_AESNI_MB driver for example does all the crypto work on dequeue whereas the
34  * QAT driver just dequeues what has been completed already.
35  */
36 #define ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE	64
37 
38 #define ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE (128)
39 
40 /* The number of MBUFS we need must be a power of two and to support other small IOs
41  * in addition to the limits mentioned above, we go to the next power of two. It is
42  * big number because it is one mempool for source and destination mbufs. It may
43  * need to be bigger to support multiple crypto drivers at once.
44  */
45 #define ACCEL_DPDK_CRYPTODEV_NUM_MBUFS			32768
46 #define ACCEL_DPDK_CRYPTODEV_POOL_CACHE_SIZE		256
47 #define ACCEL_DPDK_CRYPTODEV_MAX_CRYPTO_VOLUMES		128
48 #define ACCEL_DPDK_CRYPTODEV_NUM_SESSIONS		(2 * ACCEL_DPDK_CRYPTODEV_MAX_CRYPTO_VOLUMES)
49 #define ACCEL_DPDK_CRYPTODEV_SESS_MEMPOOL_CACHE_SIZE	0
50 
51 /* This is the max number of IOs we can supply to any crypto device QP at one time.
52  * It can vary between drivers.
53  */
54 #define ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS		2048
55 
56 /* At this moment DPDK descriptors allocation for mlx5 has some issues. We use 512
57  * as a compromise value between performance and the time spent for initialization. */
58 #define ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS_MLX5	512
59 
60 #define ACCEL_DPDK_CRYPTODEV_AESNI_MB_NUM_QP		64
61 
62 /* Common for suported devices. */
63 #define ACCEL_DPDK_CRYPTODEV_DEFAULT_NUM_XFORMS		2
64 #define ACCEL_DPDK_CRYPTODEV_IV_OFFSET (sizeof(struct rte_crypto_op) + \
65                 sizeof(struct rte_crypto_sym_op) + \
66                 (ACCEL_DPDK_CRYPTODEV_DEFAULT_NUM_XFORMS * \
67                  sizeof(struct rte_crypto_sym_xform)))
68 #define ACCEL_DPDK_CRYPTODEV_IV_LENGTH			16
69 
70 /* Driver names */
71 #define ACCEL_DPDK_CRYPTODEV_AESNI_MB	"crypto_aesni_mb"
72 #define ACCEL_DPDK_CRYPTODEV_QAT	"crypto_qat"
73 #define ACCEL_DPDK_CRYPTODEV_QAT_ASYM	"crypto_qat_asym"
74 #define ACCEL_DPDK_CRYPTODEV_MLX5	"mlx5_pci"
75 
76 /* Supported ciphers */
77 #define ACCEL_DPDK_CRYPTODEV_AES_CBC	"AES_CBC" /* QAT and ACCEL_DPDK_CRYPTODEV_AESNI_MB */
78 #define ACCEL_DPDK_CRYPTODEV_AES_XTS	"AES_XTS" /* QAT and MLX5 */
79 
80 /* Specific to AES_CBC. */
81 #define ACCEL_DPDK_CRYPTODEV_AES_CBC_KEY_LENGTH			16
82 #define ACCEL_DPDK_CRYPTODEV_AES_XTS_128_BLOCK_KEY_LENGTH	16 /* AES-XTS-128 block key size. */
83 #define ACCEL_DPDK_CRYPTODEV_AES_XTS_256_BLOCK_KEY_LENGTH	32 /* AES-XTS-256 block key size. */
84 #define ACCEL_DPDK_CRYPTODEV_AES_XTS_512_BLOCK_KEY_LENGTH	64 /* AES-XTS-512 block key size. */
85 
86 #define ACCEL_DPDK_CRYPTODEV_AES_XTS_TWEAK_KEY_LENGTH		16 /* XTS part key size is always 128 bit. */
87 
88 /* Limit of the max memory len attached to mbuf - rte_pktmbuf_attach_extbuf has uint16_t `buf_len`
89  * parameter, we use closes aligned value 32768 for better performance */
90 #define ACCEL_DPDK_CRYPTODEV_MAX_MBUF_LEN			32768
91 
92 /* Used to store IO context in mbuf */
93 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
94 	.name = "context_accel_dpdk_cryptodev",
95 	.size = sizeof(uint64_t),
96 	.align = __alignof__(uint64_t),
97 	.flags = 0,
98 };
99 
100 struct accel_dpdk_cryptodev_device;
101 
102 enum accel_dpdk_cryptodev_driver_type {
103 	ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB = 0,
104 	ACCEL_DPDK_CRYPTODEV_DRIVER_QAT,
105 	ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI,
106 	ACCEL_DPDK_CRYPTODEV_DRIVER_LAST
107 };
108 
109 enum accel_dpdk_crypto_dev_cipher_type {
110 	ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC,
111 	ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS
112 };
113 
114 struct accel_dpdk_cryptodev_qp {
115 	struct accel_dpdk_cryptodev_device *device;	/* ptr to crypto device */
116 	uint32_t num_enqueued_ops;	/* Used to decide whether to poll the qp or not */
117 	uint8_t qp; /* queue identifier */
118 	bool in_use; /* whether this node is in use or not */
119 	uint8_t index; /* used by QAT to load balance placement of qpairs */
120 	TAILQ_ENTRY(accel_dpdk_cryptodev_qp) link;
121 };
122 
123 struct accel_dpdk_cryptodev_device {
124 	enum accel_dpdk_cryptodev_driver_type type;
125 	struct rte_cryptodev_info cdev_info; /* includes DPDK device friendly name */
126 	uint32_t qp_desc_nr; /* max number of qp descriptors to be enqueued in burst */
127 	uint8_t cdev_id; /* identifier for the device */
128 	TAILQ_HEAD(, accel_dpdk_cryptodev_qp) qpairs;
129 	TAILQ_ENTRY(accel_dpdk_cryptodev_device) link;
130 };
131 
132 struct accel_dpdk_cryptodev_key_handle {
133 	struct accel_dpdk_cryptodev_device *device;
134 	TAILQ_ENTRY(accel_dpdk_cryptodev_key_handle) link;
135 	void *session_encrypt;	/* encryption session for this key */
136 	void *session_decrypt;	/* decryption session for this key */
137 	struct rte_crypto_sym_xform cipher_xform;		/* crypto control struct for this key */
138 };
139 
140 struct accel_dpdk_cryptodev_key_priv {
141 	enum accel_dpdk_cryptodev_driver_type driver;
142 	enum accel_dpdk_crypto_dev_cipher_type cipher;
143 	char *xts_key;
144 	TAILQ_HEAD(, accel_dpdk_cryptodev_key_handle) dev_keys;
145 };
146 
147 /* The crypto channel struct. It is allocated and freed on my behalf by the io channel code.
148  * We store things in here that are needed on per thread basis like the base_channel for this thread,
149  * and the poller for this thread.
150  */
151 struct accel_dpdk_cryptodev_io_channel {
152 	/* completion poller */
153 	struct spdk_poller *poller;
154 	/* Array of qpairs for each available device. The specific device will be selected depending on the crypto key */
155 	struct accel_dpdk_cryptodev_qp *device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_LAST];
156 	/* Used to queue tasks when qpair is full or only part of crypto ops was submitted to the PMD */
157 	TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks;
158 	/* Used to queue tasks that were completed in submission path - to avoid calling cpl_cb and possibly overflow
159 	 * call stack */
160 	TAILQ_HEAD(, accel_dpdk_cryptodev_task) completed_tasks;
161 };
162 
163 struct accel_dpdk_cryptodev_task {
164 	struct spdk_accel_task base;
165 	uint32_t cryop_completed;	/* The number of crypto operations completed by HW */
166 	uint32_t cryop_submitted;	/* The number of crypto operations submitted to HW */
167 	uint32_t cryop_total;		/* Total number of crypto operations in this task */
168 	bool is_failed;
169 	bool inplace;
170 	TAILQ_ENTRY(accel_dpdk_cryptodev_task) link;
171 };
172 
173 /* Shared mempools between all devices on this system */
174 static struct rte_mempool *g_session_mp = NULL;
175 static struct rte_mempool *g_session_mp_priv = NULL;
176 static struct rte_mempool *g_mbuf_mp = NULL;            /* mbuf mempool */
177 static int g_mbuf_offset;
178 static struct rte_mempool *g_crypto_op_mp = NULL;	/* crypto operations, must be rte* mempool */
179 
180 static struct rte_mbuf_ext_shared_info g_shinfo = {};   /* used by DPDK mbuf macro */
181 
182 static uint8_t g_qat_total_qp = 0;
183 static uint8_t g_next_qat_index;
184 
185 static const char *g_driver_names[] = {
186 	[ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB]	= ACCEL_DPDK_CRYPTODEV_AESNI_MB,
187 	[ACCEL_DPDK_CRYPTODEV_DRIVER_QAT]	= ACCEL_DPDK_CRYPTODEV_QAT,
188 	[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI]	= ACCEL_DPDK_CRYPTODEV_MLX5
189 };
190 static const char *g_cipher_names[] = {
191 	[ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC]	= ACCEL_DPDK_CRYPTODEV_AES_CBC,
192 	[ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS]	= ACCEL_DPDK_CRYPTODEV_AES_XTS,
193 };
194 
195 static enum accel_dpdk_cryptodev_driver_type g_dpdk_cryptodev_driver =
196 	ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
197 
198 /* Global list of all crypto devices */
199 static TAILQ_HEAD(, accel_dpdk_cryptodev_device) g_crypto_devices = TAILQ_HEAD_INITIALIZER(
200 			g_crypto_devices);
201 static pthread_mutex_t g_device_lock = PTHREAD_MUTEX_INITIALIZER;
202 
203 static struct spdk_accel_module_if g_accel_dpdk_cryptodev_module;
204 
205 static int accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
206 		struct accel_dpdk_cryptodev_task *task);
207 
208 void
209 accel_dpdk_cryptodev_enable(void)
210 {
211 	spdk_accel_module_list_add(&g_accel_dpdk_cryptodev_module);
212 }
213 
214 int
215 accel_dpdk_cryptodev_set_driver(const char *driver_name)
216 {
217 	if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_QAT) == 0) {
218 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_QAT;
219 	} else if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_AESNI_MB) == 0) {
220 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
221 	} else if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_MLX5) == 0) {
222 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI;
223 	} else {
224 		SPDK_ERRLOG("Unsupported driver %s\n", driver_name);
225 		return -EINVAL;
226 	}
227 
228 	SPDK_NOTICELOG("Using driver %s\n", driver_name);
229 
230 	return 0;
231 }
232 
233 const char *
234 accel_dpdk_cryptodev_get_driver(void)
235 {
236 	return g_driver_names[g_dpdk_cryptodev_driver];
237 }
238 
239 static inline uint16_t
240 accel_dpdk_cryptodev_poll_qp(struct accel_dpdk_cryptodev_qp *qp,
241 			     struct accel_dpdk_cryptodev_io_channel *crypto_ch)
242 {
243 	struct rte_crypto_op *dequeued_ops[ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE];
244 	struct rte_mbuf *mbufs_to_free[2 * ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE];
245 	struct accel_dpdk_cryptodev_task *task;
246 	uint32_t num_mbufs = 0;
247 	int i;
248 	uint16_t num_dequeued_ops;
249 
250 	/* Each run of the poller will get just what the device has available
251 	 * at the moment we call it, we don't check again after draining the
252 	 * first batch.
253 	 */
254 	num_dequeued_ops = rte_cryptodev_dequeue_burst(qp->device->cdev_id, qp->qp,
255 			   dequeued_ops, ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE);
256 	/* Check if operation was processed successfully */
257 	for (i = 0; i < num_dequeued_ops; i++) {
258 
259 		/* We don't know the order or association of the crypto ops wrt any
260 		 * particular task so need to look at each and determine if it's
261 		 * the last one for it's task or not.
262 		 */
263 		task = (struct accel_dpdk_cryptodev_task *)*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src,
264 				g_mbuf_offset, uint64_t *);
265 		assert(task != NULL);
266 
267 		if (dequeued_ops[i]->status != RTE_CRYPTO_OP_STATUS_SUCCESS) {
268 			SPDK_ERRLOG("error with op %d status %u\n", i, dequeued_ops[i]->status);
269 			/* Update the task status to error, we'll still process the
270 			 * rest of the crypto ops for this task though so they
271 			 * aren't left hanging.
272 			 */
273 			task->is_failed = true;
274 		}
275 
276 		/* Return the associated src and dst mbufs by collecting them into
277 		 * an array that we can use the bulk API to free after the loop.
278 		 */
279 		*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset, uint64_t *) = 0;
280 		mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_src;
281 		if (dequeued_ops[i]->sym->m_dst) {
282 			mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_dst;
283 		}
284 
285 		task->cryop_completed++;
286 		if (task->cryop_completed == task->cryop_total) {
287 			/* Complete the IO */
288 			spdk_accel_task_complete(&task->base, task->is_failed ? -EINVAL : 0);
289 		} else if (task->cryop_completed == task->cryop_submitted) {
290 			/* submit remaining crypto ops */
291 			int rc = accel_dpdk_cryptodev_process_task(crypto_ch, task);
292 
293 			if (spdk_unlikely(rc)) {
294 				if (rc == -ENOMEM) {
295 					TAILQ_INSERT_TAIL(&crypto_ch->queued_tasks, task, link);
296 					continue;
297 				} else if (rc == -EALREADY) {
298 					/* -EALREADY means that a task is completed, but it might be unsafe to complete
299 					 * it if we are in the submission path. Since we are in the poller context, we can
300 					 * complete th task immediately */
301 					rc = 0;
302 				}
303 				spdk_accel_task_complete(&task->base, rc);
304 			}
305 		}
306 	}
307 
308 	/* Now bulk free both mbufs and crypto operations. */
309 	if (num_dequeued_ops > 0) {
310 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)dequeued_ops, num_dequeued_ops);
311 		assert(num_mbufs > 0);
312 		/* This also releases chained mbufs if any. */
313 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
314 	}
315 
316 	assert(qp->num_enqueued_ops >= num_dequeued_ops);
317 	qp->num_enqueued_ops -= num_dequeued_ops;
318 
319 	return num_dequeued_ops;
320 }
321 
322 /* This is the poller for the crypto module. It uses a single API to dequeue whatever is ready at
323  * the device. Then we need to decide if what we've got so far (including previous poller
324  * runs) totals up to one or more complete task */
325 static int
326 accel_dpdk_cryptodev_poller(void *args)
327 {
328 	struct accel_dpdk_cryptodev_io_channel *crypto_ch = args;
329 	struct accel_dpdk_cryptodev_qp *qp;
330 	struct accel_dpdk_cryptodev_task *task, *task_tmp;
331 	TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks_tmp;
332 	uint32_t num_dequeued_ops = 0, num_enqueued_ops = 0, num_completed_tasks = 0;
333 	int i, rc;
334 
335 	for (i = 0; i < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST; i++) {
336 		qp = crypto_ch->device_qp[i];
337 		/* Avoid polling "idle" qps since it may affect performance */
338 		if (qp && qp->num_enqueued_ops) {
339 			num_dequeued_ops += accel_dpdk_cryptodev_poll_qp(qp, crypto_ch);
340 		}
341 	}
342 
343 	if (!TAILQ_EMPTY(&crypto_ch->queued_tasks)) {
344 		TAILQ_INIT(&queued_tasks_tmp);
345 
346 		TAILQ_FOREACH_SAFE(task, &crypto_ch->queued_tasks, link, task_tmp) {
347 			TAILQ_REMOVE(&crypto_ch->queued_tasks, task, link);
348 			rc = accel_dpdk_cryptodev_process_task(crypto_ch, task);
349 			if (spdk_unlikely(rc)) {
350 				if (rc == -ENOMEM) {
351 					TAILQ_INSERT_TAIL(&queued_tasks_tmp, task, link);
352 					/* Other queued tasks may belong to other qpairs,
353 					 * so process the whole list */
354 					continue;
355 				} else if (rc == -EALREADY) {
356 					/* -EALREADY means that a task is completed, but it might be unsafe to complete
357 					 * it if we are in the submission path. Since we are in the poller context, we can
358 					 * complete th task immediately */
359 					rc = 0;
360 				}
361 				spdk_accel_task_complete(&task->base, rc);
362 				num_completed_tasks++;
363 			} else {
364 				num_enqueued_ops++;
365 			}
366 		}
367 
368 		TAILQ_SWAP(&crypto_ch->queued_tasks, &queued_tasks_tmp, accel_dpdk_cryptodev_task, link);
369 	}
370 
371 	TAILQ_FOREACH_SAFE(task, &crypto_ch->completed_tasks, link, task_tmp) {
372 		TAILQ_REMOVE(&crypto_ch->completed_tasks, task, link);
373 		spdk_accel_task_complete(&task->base, rc);
374 		num_completed_tasks++;
375 	}
376 
377 	return !!(num_dequeued_ops + num_enqueued_ops + num_completed_tasks);
378 }
379 
380 /* Allocate the new mbuf of @remainder size with data pointed by @addr and attach
381  * it to the @orig_mbuf. */
382 static inline int
383 accel_dpdk_cryptodev_mbuf_chain_remainder(struct accel_dpdk_cryptodev_task *task,
384 		struct rte_mbuf *orig_mbuf, uint8_t *addr, uint64_t *_remainder)
385 {
386 	uint64_t phys_addr, phys_len, remainder = *_remainder;
387 	struct rte_mbuf *chain_mbuf;
388 	int rc;
389 
390 	phys_len = remainder;
391 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
392 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR)) {
393 		return -EFAULT;
394 	}
395 	remainder = spdk_min(remainder, phys_len);
396 	remainder = spdk_min(remainder, ACCEL_DPDK_CRYPTODEV_MAX_MBUF_LEN);
397 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&chain_mbuf, 1);
398 	if (spdk_unlikely(rc)) {
399 		return -ENOMEM;
400 	}
401 	/* Store context in every mbuf as we don't know anything about completion order */
402 	*RTE_MBUF_DYNFIELD(chain_mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)task;
403 	rte_pktmbuf_attach_extbuf(chain_mbuf, addr, phys_addr, remainder, &g_shinfo);
404 	rte_pktmbuf_append(chain_mbuf, remainder);
405 
406 	/* Chained buffer is released by rte_pktbuf_free_bulk() automagicaly. */
407 	rte_pktmbuf_chain(orig_mbuf, chain_mbuf);
408 	*_remainder = remainder;
409 
410 	return 0;
411 }
412 
413 /* Attach data buffer pointed by @addr to @mbuf. Return utilized len of the
414  * contiguous space that was physically available. */
415 static inline uint64_t
416 accel_dpdk_cryptodev_mbuf_attach_buf(struct accel_dpdk_cryptodev_task *task, struct rte_mbuf *mbuf,
417 				     uint8_t *addr, uint32_t len)
418 {
419 	uint64_t phys_addr, phys_len;
420 
421 	/* Store context in every mbuf as we don't know anything about completion order */
422 	*RTE_MBUF_DYNFIELD(mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)task;
423 
424 	phys_len = len;
425 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
426 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len == 0)) {
427 		return 0;
428 	}
429 	assert(phys_len <= len);
430 	phys_len = spdk_min(phys_len, ACCEL_DPDK_CRYPTODEV_MAX_MBUF_LEN);
431 
432 	/* Set the mbuf elements address and length. */
433 	rte_pktmbuf_attach_extbuf(mbuf, addr, phys_addr, phys_len, &g_shinfo);
434 	rte_pktmbuf_append(mbuf, phys_len);
435 
436 	return phys_len;
437 }
438 
439 static inline struct accel_dpdk_cryptodev_key_handle *
440 accel_dpdk_find_key_handle_in_channel(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
441 				      struct accel_dpdk_cryptodev_key_priv *key)
442 {
443 	struct accel_dpdk_cryptodev_key_handle *key_handle;
444 
445 	if (key->driver == ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI) {
446 		/* Crypto key is registered on all available devices while io_channel opens CQ/QP on a single device.
447 		 * We need to iterate a list of key entries to find a suitable device */
448 		TAILQ_FOREACH(key_handle, &key->dev_keys, link) {
449 			if (key_handle->device->cdev_id ==
450 			    crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI]->device->cdev_id) {
451 				return key_handle;
452 			}
453 		}
454 		return NULL;
455 	} else {
456 		return TAILQ_FIRST(&key->dev_keys);
457 	}
458 }
459 
460 static inline int
461 accel_dpdk_cryptodev_task_alloc_resources(struct rte_mbuf **src_mbufs, struct rte_mbuf **dst_mbufs,
462 		struct rte_crypto_op **crypto_ops, int count)
463 {
464 	int rc;
465 
466 	/* Get the number of source mbufs that we need. These will always be 1:1 because we
467 	 * don't support chaining. The reason we don't is because of our decision to use
468 	 * LBA as IV, there can be no case where we'd need >1 mbuf per crypto op or the
469 	 * op would be > 1 LBA.
470 	 */
471 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, count);
472 	if (rc) {
473 		SPDK_ERRLOG("Failed to get src_mbufs!\n");
474 		return -ENOMEM;
475 	}
476 
477 	/* Get the same amount to describe destination. If crypto operation is inline then we don't just skip it */
478 	if (dst_mbufs) {
479 		rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, dst_mbufs, count);
480 		if (rc) {
481 			SPDK_ERRLOG("Failed to get dst_mbufs!\n");
482 			goto err_free_src;
483 		}
484 	}
485 
486 #ifdef __clang_analyzer__
487 	/* silence scan-build false positive */
488 	SPDK_CLANG_ANALYZER_PREINIT_PTR_ARRAY(crypto_ops, ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE,
489 					      0x1000);
490 #endif
491 	/* Allocate crypto operations. */
492 	rc = rte_crypto_op_bulk_alloc(g_crypto_op_mp,
493 				      RTE_CRYPTO_OP_TYPE_SYMMETRIC,
494 				      crypto_ops, count);
495 	if (rc < count) {
496 		SPDK_ERRLOG("Failed to allocate crypto ops! rc %d\n", rc);
497 		goto err_free_ops;
498 	}
499 
500 	return 0;
501 
502 err_free_ops:
503 	if (rc > 0) {
504 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops, rc);
505 	}
506 	if (dst_mbufs) {
507 		/* This also releases chained mbufs if any. */
508 		rte_pktmbuf_free_bulk(dst_mbufs, count);
509 	}
510 err_free_src:
511 	/* This also releases chained mbufs if any. */
512 	rte_pktmbuf_free_bulk(src_mbufs, count);
513 
514 	return -ENOMEM;
515 }
516 
517 static inline int
518 accel_dpdk_cryptodev_mbuf_add_single_block(struct spdk_iov_sgl *sgl, struct rte_mbuf *mbuf,
519 		struct accel_dpdk_cryptodev_task *task)
520 {
521 	int rc;
522 	uint8_t *buf_addr;
523 	uint64_t phys_len;
524 	uint64_t remainder;
525 	uint64_t buf_len;
526 
527 	assert(sgl->iov->iov_len > sgl->iov_offset);
528 	buf_len = spdk_min(task->base.block_size, sgl->iov->iov_len - sgl->iov_offset);
529 	buf_addr = sgl->iov->iov_base + sgl->iov_offset;
530 	phys_len = accel_dpdk_cryptodev_mbuf_attach_buf(task, mbuf, buf_addr, buf_len);
531 	if (spdk_unlikely(phys_len == 0)) {
532 		return -EFAULT;
533 	}
534 	buf_len = spdk_min(buf_len, phys_len);
535 	spdk_iov_sgl_advance(sgl, buf_len);
536 
537 	/* Handle the case of page boundary. */
538 	assert(task->base.block_size >= buf_len);
539 	remainder = task->base.block_size - buf_len;
540 	while (remainder) {
541 		buf_len = spdk_min(remainder, sgl->iov->iov_len - sgl->iov_offset);
542 		buf_addr = sgl->iov->iov_base + sgl->iov_offset;
543 		rc = accel_dpdk_cryptodev_mbuf_chain_remainder(task, mbuf, buf_addr, &buf_len);
544 		if (spdk_unlikely(rc)) {
545 			return rc;
546 		}
547 		spdk_iov_sgl_advance(sgl, buf_len);
548 		remainder -= buf_len;
549 	}
550 
551 	return 0;
552 }
553 
554 static inline void
555 accel_dpdk_cryptodev_op_set_iv(struct rte_crypto_op *crypto_op, uint64_t iv)
556 {
557 	uint8_t *iv_ptr = rte_crypto_op_ctod_offset(crypto_op, uint8_t *, ACCEL_DPDK_CRYPTODEV_IV_OFFSET);
558 
559 	/* Set the IV - we use the LBA of the crypto_op */
560 	memset(iv_ptr, 0, ACCEL_DPDK_CRYPTODEV_IV_LENGTH);
561 	rte_memcpy(iv_ptr, &iv, sizeof(uint64_t));
562 }
563 
564 static inline void
565 accel_dpdk_cryptodev_update_resources_from_pools(struct rte_crypto_op **crypto_ops,
566 		struct rte_mbuf **src_mbufs, struct rte_mbuf **dst_mbufs,
567 		uint32_t num_enqueued_ops, uint32_t cryop_cnt)
568 {
569 	memmove(crypto_ops, &crypto_ops[num_enqueued_ops], sizeof(crypto_ops[0]) * cryop_cnt);
570 	memmove(src_mbufs, &src_mbufs[num_enqueued_ops], sizeof(src_mbufs[0]) * cryop_cnt);
571 	if (dst_mbufs) {
572 		memmove(dst_mbufs, &dst_mbufs[num_enqueued_ops], sizeof(dst_mbufs[0]) * cryop_cnt);
573 	}
574 }
575 
576 static int
577 accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
578 				  struct accel_dpdk_cryptodev_task *task)
579 {
580 	uint16_t num_enqueued_ops;
581 	uint32_t cryop_cnt;
582 	uint32_t crypto_len = task->base.block_size;
583 	uint64_t dst_length, total_length;
584 	uint32_t sgl_offset;
585 	uint32_t qp_capacity;
586 	uint64_t iv_start;
587 	uint32_t i, crypto_index;
588 	struct rte_crypto_op *crypto_ops[ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE];
589 	struct rte_mbuf *src_mbufs[ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE];
590 	struct rte_mbuf *dst_mbufs[ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE];
591 	void *session;
592 	struct accel_dpdk_cryptodev_key_priv *priv;
593 	struct accel_dpdk_cryptodev_key_handle *key_handle;
594 	struct accel_dpdk_cryptodev_qp *qp;
595 	struct accel_dpdk_cryptodev_device *dev;
596 	struct spdk_iov_sgl src, dst = {};
597 	int rc;
598 	bool inplace = task->inplace;
599 
600 	if (spdk_unlikely(!task->base.crypto_key ||
601 			  task->base.crypto_key->module_if != &g_accel_dpdk_cryptodev_module)) {
602 		return -EINVAL;
603 	}
604 
605 	priv = task->base.crypto_key->priv;
606 	assert(priv->driver < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST);
607 
608 	if (task->cryop_completed) {
609 		/* We continue to process remaining blocks */
610 		assert(task->cryop_submitted == task->cryop_completed);
611 		assert(task->cryop_total > task->cryop_completed);
612 		cryop_cnt = task->cryop_total - task->cryop_completed;
613 		sgl_offset = task->cryop_completed * crypto_len;
614 		iv_start = task->base.iv + task->cryop_completed;
615 	} else {
616 		/* That is a new task */
617 		total_length = 0;
618 		for (i = 0; i < task->base.s.iovcnt; i++) {
619 			total_length += task->base.s.iovs[i].iov_len;
620 		}
621 		dst_length = 0;
622 		for (i = 0; i < task->base.d.iovcnt; i++) {
623 			dst_length += task->base.d.iovs[i].iov_len;
624 		}
625 
626 		if (spdk_unlikely(total_length != dst_length || !total_length)) {
627 			return -ERANGE;
628 		}
629 		if (spdk_unlikely(total_length % task->base.block_size != 0)) {
630 			return -EINVAL;
631 		}
632 
633 		cryop_cnt = total_length / task->base.block_size;
634 		task->cryop_total = cryop_cnt;
635 		sgl_offset = 0;
636 		iv_start = task->base.iv;
637 	}
638 
639 	/* Limit the number of crypto ops that we can process once */
640 	cryop_cnt = spdk_min(cryop_cnt, ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE);
641 
642 	qp = crypto_ch->device_qp[priv->driver];
643 	assert(qp);
644 	dev = qp->device;
645 	assert(dev);
646 	assert(dev->qp_desc_nr >= qp->num_enqueued_ops);
647 
648 	qp_capacity = dev->qp_desc_nr - qp->num_enqueued_ops;
649 	cryop_cnt = spdk_min(cryop_cnt, qp_capacity);
650 	if (spdk_unlikely(cryop_cnt == 0)) {
651 		/* QP is full */
652 		return -ENOMEM;
653 	}
654 
655 	key_handle = accel_dpdk_find_key_handle_in_channel(crypto_ch, priv);
656 	if (spdk_unlikely(!key_handle)) {
657 		SPDK_ERRLOG("Failed to find a key handle, driver %s, cipher %s\n", g_driver_names[priv->driver],
658 			    g_cipher_names[priv->cipher]);
659 		return -EINVAL;
660 	}
661 	/* mlx5_pci binds keys to a specific device, we can't use a key with any device */
662 	assert(dev == key_handle->device || priv->driver != ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI);
663 
664 	if (task->base.op_code == ACCEL_OPC_ENCRYPT) {
665 		session = key_handle->session_encrypt;
666 	} else if (task->base.op_code == ACCEL_OPC_DECRYPT) {
667 		session = key_handle->session_decrypt;
668 	} else {
669 		return -EINVAL;
670 	}
671 
672 	rc = accel_dpdk_cryptodev_task_alloc_resources(src_mbufs, inplace ? NULL : dst_mbufs,
673 			crypto_ops, cryop_cnt);
674 	if (rc) {
675 		return rc;
676 	}
677 
678 	/* As we don't support chaining because of a decision to use LBA as IV, construction
679 	 * of crypto operations is straightforward. We build both the op, the mbuf and the
680 	 * dst_mbuf in our local arrays by looping through the length of the accel task and
681 	 * picking off LBA sized blocks of memory from the IOVs as we walk through them. Each
682 	 * LBA sized chunk of memory will correspond 1:1 to a crypto operation and a single
683 	 * mbuf per crypto operation.
684 	 */
685 	spdk_iov_sgl_init(&src, task->base.s.iovs, task->base.s.iovcnt, 0);
686 	spdk_iov_sgl_advance(&src, sgl_offset);
687 	if (!inplace) {
688 		spdk_iov_sgl_init(&dst, task->base.d.iovs, task->base.d.iovcnt, 0);
689 		spdk_iov_sgl_advance(&dst, sgl_offset);
690 	}
691 
692 	for (crypto_index = 0; crypto_index < cryop_cnt; crypto_index++) {
693 		rc = accel_dpdk_cryptodev_mbuf_add_single_block(&src, src_mbufs[crypto_index], task);
694 		if (spdk_unlikely(rc)) {
695 			goto free_ops;
696 		}
697 		accel_dpdk_cryptodev_op_set_iv(crypto_ops[crypto_index], iv_start);
698 		iv_start++;
699 
700 		/* Set the data to encrypt/decrypt length */
701 		crypto_ops[crypto_index]->sym->cipher.data.length = crypto_len;
702 		crypto_ops[crypto_index]->sym->cipher.data.offset = 0;
703 		rte_crypto_op_attach_sym_session(crypto_ops[crypto_index], session);
704 
705 		/* link the mbuf to the crypto op. */
706 		crypto_ops[crypto_index]->sym->m_src = src_mbufs[crypto_index];
707 
708 		if (inplace) {
709 			crypto_ops[crypto_index]->sym->m_dst = NULL;
710 		} else {
711 #ifndef __clang_analyzer__
712 			/* scan-build thinks that dst_mbufs is not initialized */
713 			rc = accel_dpdk_cryptodev_mbuf_add_single_block(&dst, dst_mbufs[crypto_index], task);
714 			if (spdk_unlikely(rc)) {
715 				goto free_ops;
716 			}
717 			crypto_ops[crypto_index]->sym->m_dst = dst_mbufs[crypto_index];
718 #endif
719 		}
720 	}
721 
722 	/* Enqueue everything we've got but limit by the max number of descriptors we
723 	 * configured the crypto device for.
724 	 */
725 	num_enqueued_ops = rte_cryptodev_enqueue_burst(dev->cdev_id, qp->qp, crypto_ops, cryop_cnt);
726 	/* This value is used in the completion callback to determine when the accel task is complete. */
727 	task->cryop_submitted += num_enqueued_ops;
728 	qp->num_enqueued_ops += num_enqueued_ops;
729 	/* We were unable to enqueue everything but did get some, so need to decide what
730 	 * to do based on the status of the last op.
731 	 */
732 	if (num_enqueued_ops < cryop_cnt) {
733 		switch (crypto_ops[num_enqueued_ops]->status) {
734 		case RTE_CRYPTO_OP_STATUS_SUCCESS:
735 			/* Crypto operation might be completed successfully but enqueuing to a completion ring might fail.
736 			 * That might happen with SW PMDs like openssl
737 			 * We can't retry such operation on next turn since if crypto operation was inplace, we can encrypt/
738 			 * decrypt already processed buffer. See github issue #2907 for more details.
739 			 * Handle this case as the crypto op was completed successfully - increment cryop_submitted and
740 			 * cryop_completed.
741 			 * We won't receive a completion for such operation, so we need to cleanup mbufs and crypto_ops */
742 			assert(task->cryop_total > task->cryop_completed);
743 			task->cryop_completed++;
744 			task->cryop_submitted++;
745 			if (task->cryop_completed == task->cryop_total) {
746 				assert(num_enqueued_ops == 0);
747 				/* All crypto ops are completed. We can't complete the task immediately since this function might be
748 				 * called in scope of spdk_accel_submit_* function and user's logic in the completion callback
749 				 * might lead to stack overflow */
750 				cryop_cnt -= num_enqueued_ops;
751 				accel_dpdk_cryptodev_update_resources_from_pools(crypto_ops, src_mbufs, inplace ? NULL : dst_mbufs,
752 						num_enqueued_ops, cryop_cnt);
753 				rc = -EALREADY;
754 				goto free_ops;
755 			}
756 		/* fallthrough */
757 		case RTE_CRYPTO_OP_STATUS_NOT_PROCESSED:
758 			if (num_enqueued_ops == 0) {
759 				/* Nothing was submitted. Free crypto ops and mbufs, treat this case as NOMEM */
760 				rc = -ENOMEM;
761 				goto free_ops;
762 			}
763 			/* Part of the crypto operations were not submitted, release mbufs and crypto ops.
764 			 * The rest crypto ops will be submitted again once current batch is completed */
765 			cryop_cnt -= num_enqueued_ops;
766 			accel_dpdk_cryptodev_update_resources_from_pools(crypto_ops, src_mbufs, inplace ? NULL : dst_mbufs,
767 					num_enqueued_ops, cryop_cnt);
768 			rc = 0;
769 			goto free_ops;
770 		default:
771 			/* For all other statuses, mark task as failed so that the poller will pick
772 			 * the failure up for the overall task status.
773 			 */
774 			task->is_failed = true;
775 			if (num_enqueued_ops == 0) {
776 				/* If nothing was enqueued, but the last one wasn't because of
777 				 * busy, fail it now as the poller won't know anything about it.
778 				 */
779 				rc = -EINVAL;
780 				goto free_ops;
781 			}
782 			break;
783 		}
784 	}
785 
786 	return 0;
787 
788 	/* Error cleanup paths. */
789 free_ops:
790 	if (!inplace) {
791 		/* This also releases chained mbufs if any. */
792 		rte_pktmbuf_free_bulk(dst_mbufs, cryop_cnt);
793 	}
794 	rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops, cryop_cnt);
795 	/* This also releases chained mbufs if any. */
796 	rte_pktmbuf_free_bulk(src_mbufs, cryop_cnt);
797 	return rc;
798 }
799 
800 static inline struct accel_dpdk_cryptodev_qp *
801 accel_dpdk_cryptodev_get_next_device_qpair(enum accel_dpdk_cryptodev_driver_type type)
802 {
803 	struct accel_dpdk_cryptodev_device *device, *device_tmp;
804 	struct accel_dpdk_cryptodev_qp *qpair;
805 
806 	TAILQ_FOREACH_SAFE(device, &g_crypto_devices, link, device_tmp) {
807 		if (device->type != type) {
808 			continue;
809 		}
810 		TAILQ_FOREACH(qpair, &device->qpairs, link) {
811 			if (!qpair->in_use) {
812 				qpair->in_use = true;
813 				return qpair;
814 			}
815 		}
816 	}
817 
818 	return NULL;
819 }
820 
821 /* Helper function for the channel creation callback.
822  * Returns the number of drivers assigned to the channel */
823 static uint32_t
824 accel_dpdk_cryptodev_assign_device_qps(struct accel_dpdk_cryptodev_io_channel *crypto_ch)
825 {
826 	struct accel_dpdk_cryptodev_device *device;
827 	struct accel_dpdk_cryptodev_qp *device_qp;
828 	uint32_t num_drivers = 0;
829 	bool qat_found = false;
830 
831 	pthread_mutex_lock(&g_device_lock);
832 
833 	TAILQ_FOREACH(device, &g_crypto_devices, link) {
834 		if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT && !qat_found) {
835 			/* For some QAT devices, the optimal qp to use is every 32nd as this spreads the
836 			 * workload out over the multiple virtual functions in the device. For the devices
837 			 * where this isn't the case, it doesn't hurt.
838 			 */
839 			TAILQ_FOREACH(device_qp, &device->qpairs, link) {
840 				if (device_qp->index != g_next_qat_index) {
841 					continue;
842 				}
843 				if (device_qp->in_use == false) {
844 					assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_QAT] == NULL);
845 					crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_QAT] = device_qp;
846 					device_qp->in_use = true;
847 					g_next_qat_index = (g_next_qat_index + ACCEL_DPDK_CRYPTODEV_QAT_VF_SPREAD) % g_qat_total_qp;
848 					qat_found = true;
849 					num_drivers++;
850 					break;
851 				} else {
852 					/* if the preferred index is used, skip to the next one in this set. */
853 					g_next_qat_index = (g_next_qat_index + 1) % g_qat_total_qp;
854 				}
855 			}
856 		}
857 	}
858 
859 	/* For ACCEL_DPDK_CRYPTODEV_AESNI_MB and MLX5_PCI select devices in round-robin manner */
860 	device_qp = accel_dpdk_cryptodev_get_next_device_qpair(ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB);
861 	if (device_qp) {
862 		assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB] == NULL);
863 		crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB] = device_qp;
864 		num_drivers++;
865 	}
866 
867 	device_qp = accel_dpdk_cryptodev_get_next_device_qpair(ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI);
868 	if (device_qp) {
869 		assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI] == NULL);
870 		crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI] = device_qp;
871 		num_drivers++;
872 	}
873 
874 	pthread_mutex_unlock(&g_device_lock);
875 
876 	return num_drivers;
877 }
878 
879 static void
880 _accel_dpdk_cryptodev_destroy_cb(void *io_device, void *ctx_buf)
881 {
882 	struct accel_dpdk_cryptodev_io_channel *crypto_ch = (struct accel_dpdk_cryptodev_io_channel *)
883 			ctx_buf;
884 	int i;
885 
886 	pthread_mutex_lock(&g_device_lock);
887 	for (i = 0; i < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST; i++) {
888 		if (crypto_ch->device_qp[i]) {
889 			crypto_ch->device_qp[i]->in_use = false;
890 		}
891 	}
892 	pthread_mutex_unlock(&g_device_lock);
893 
894 	spdk_poller_unregister(&crypto_ch->poller);
895 }
896 
897 static int
898 _accel_dpdk_cryptodev_create_cb(void *io_device, void *ctx_buf)
899 {
900 	struct accel_dpdk_cryptodev_io_channel *crypto_ch = (struct accel_dpdk_cryptodev_io_channel *)
901 			ctx_buf;
902 
903 	crypto_ch->poller = SPDK_POLLER_REGISTER(accel_dpdk_cryptodev_poller, crypto_ch, 0);
904 	if (!accel_dpdk_cryptodev_assign_device_qps(crypto_ch)) {
905 		SPDK_ERRLOG("No crypto drivers assigned\n");
906 		spdk_poller_unregister(&crypto_ch->poller);
907 		return -EINVAL;
908 	}
909 
910 	/* We use this to queue tasks when qpair is full or no resources in pools */
911 	TAILQ_INIT(&crypto_ch->queued_tasks);
912 	TAILQ_INIT(&crypto_ch->completed_tasks);
913 
914 	return 0;
915 }
916 
917 static struct spdk_io_channel *
918 accel_dpdk_cryptodev_get_io_channel(void)
919 {
920 	return spdk_get_io_channel(&g_accel_dpdk_cryptodev_module);
921 }
922 
923 static size_t
924 accel_dpdk_cryptodev_ctx_size(void)
925 {
926 	return sizeof(struct accel_dpdk_cryptodev_task);
927 }
928 
929 static bool
930 accel_dpdk_cryptodev_supports_opcode(enum accel_opcode opc)
931 {
932 	switch (opc) {
933 	case ACCEL_OPC_ENCRYPT:
934 	case ACCEL_OPC_DECRYPT:
935 		return true;
936 	default:
937 		return false;
938 	}
939 }
940 
941 static int
942 accel_dpdk_cryptodev_submit_tasks(struct spdk_io_channel *_ch, struct spdk_accel_task *_task)
943 {
944 	struct accel_dpdk_cryptodev_task *task = SPDK_CONTAINEROF(_task, struct accel_dpdk_cryptodev_task,
945 			base);
946 	struct accel_dpdk_cryptodev_io_channel *ch = spdk_io_channel_get_ctx(_ch);
947 	int rc;
948 
949 	task->cryop_completed = 0;
950 	task->cryop_submitted = 0;
951 	task->cryop_total = 0;
952 	task->inplace = true;
953 	task->is_failed = false;
954 
955 	/* Check if crypto operation is inplace: no destination or source == destination */
956 	if (task->base.s.iovcnt == task->base.d.iovcnt) {
957 		if (memcmp(task->base.s.iovs, task->base.d.iovs, sizeof(struct iovec) * task->base.s.iovcnt) != 0) {
958 			task->inplace = false;
959 		}
960 	} else if (task->base.d.iovcnt != 0) {
961 		task->inplace = false;
962 	}
963 
964 	rc = accel_dpdk_cryptodev_process_task(ch, task);
965 	if (spdk_unlikely(rc)) {
966 		if (rc == -ENOMEM) {
967 			TAILQ_INSERT_TAIL(&ch->queued_tasks, task, link);
968 			rc = 0;
969 		} else if (rc == -EALREADY) {
970 			/* -EALREADY means that a task is completed, but it might be unsafe to complete
971 			 * it if we are in the submission path. Hence put it into a dedicated queue to and
972 			 * process it during polling */
973 			TAILQ_INSERT_TAIL(&ch->completed_tasks, task, link);
974 			rc = 0;
975 		}
976 	}
977 
978 	return rc;
979 }
980 
981 /* Dummy function used by DPDK to free ext attached buffers to mbufs, we free them ourselves but
982  * this callback has to be here. */
983 static void
984 shinfo_free_cb(void *arg1, void *arg2)
985 {
986 }
987 
988 static int
989 accel_dpdk_cryptodev_create(uint8_t index, uint16_t num_lcores)
990 {
991 	struct rte_cryptodev_qp_conf qp_conf = {
992 		.mp_session = g_session_mp,
993 #if RTE_VERSION < RTE_VERSION_NUM(22, 11, 0, 0)
994 		.mp_session_private = g_session_mp_priv
995 #endif
996 	};
997 	/* Setup queue pairs. */
998 	struct rte_cryptodev_config conf = { .socket_id = SPDK_ENV_SOCKET_ID_ANY };
999 	struct accel_dpdk_cryptodev_device *device;
1000 	uint8_t j, cdev_id, cdrv_id;
1001 	struct accel_dpdk_cryptodev_qp *dev_qp;
1002 	int rc;
1003 
1004 	device = calloc(1, sizeof(*device));
1005 	if (!device) {
1006 		return -ENOMEM;
1007 	}
1008 
1009 	/* Get details about this device. */
1010 	rte_cryptodev_info_get(index, &device->cdev_info);
1011 	cdrv_id = device->cdev_info.driver_id;
1012 	cdev_id = device->cdev_id = index;
1013 
1014 	if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_QAT) == 0) {
1015 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
1016 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_QAT;
1017 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_AESNI_MB) == 0) {
1018 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
1019 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
1020 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_MLX5) == 0) {
1021 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS_MLX5;
1022 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI;
1023 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_QAT_ASYM) == 0) {
1024 		/* ACCEL_DPDK_CRYPTODEV_QAT_ASYM devices are not supported at this time. */
1025 		rc = 0;
1026 		goto err;
1027 	} else {
1028 		SPDK_ERRLOG("Failed to start device %u. Invalid driver name \"%s\"\n",
1029 			    cdev_id, device->cdev_info.driver_name);
1030 		rc = -EINVAL;
1031 		goto err;
1032 	}
1033 
1034 	/* Before going any further, make sure we have enough resources for this
1035 	 * device type to function.  We need a unique queue pair per core accross each
1036 	 * device type to remain lockless....
1037 	 */
1038 	if ((rte_cryptodev_device_count_by_driver(cdrv_id) *
1039 	     device->cdev_info.max_nb_queue_pairs) < num_lcores) {
1040 		SPDK_ERRLOG("Insufficient unique queue pairs available for %s\n",
1041 			    device->cdev_info.driver_name);
1042 		SPDK_ERRLOG("Either add more crypto devices or decrease core count\n");
1043 		rc = -EINVAL;
1044 		goto err;
1045 	}
1046 
1047 	conf.nb_queue_pairs = device->cdev_info.max_nb_queue_pairs;
1048 	rc = rte_cryptodev_configure(cdev_id, &conf);
1049 	if (rc < 0) {
1050 		SPDK_ERRLOG("Failed to configure cryptodev %u: error %d\n",
1051 			    cdev_id, rc);
1052 		rc = -EINVAL;
1053 		goto err;
1054 	}
1055 
1056 	/* Pre-setup all potential qpairs now and assign them in the channel
1057 	 * callback. If we were to create them there, we'd have to stop the
1058 	 * entire device affecting all other threads that might be using it
1059 	 * even on other queue pairs.
1060 	 */
1061 	qp_conf.nb_descriptors = device->qp_desc_nr;
1062 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
1063 		rc = rte_cryptodev_queue_pair_setup(cdev_id, j, &qp_conf, SOCKET_ID_ANY);
1064 		if (rc < 0) {
1065 			SPDK_ERRLOG("Failed to setup queue pair %u on "
1066 				    "cryptodev %u: error %d\n", j, cdev_id, rc);
1067 			rc = -EINVAL;
1068 			goto err_qp_setup;
1069 		}
1070 	}
1071 
1072 	rc = rte_cryptodev_start(cdev_id);
1073 	if (rc < 0) {
1074 		SPDK_ERRLOG("Failed to start device %u: error %d\n", cdev_id, rc);
1075 		rc = -EINVAL;
1076 		goto err_dev_start;
1077 	}
1078 
1079 	TAILQ_INIT(&device->qpairs);
1080 	/* Build up lists of device/qp combinations per PMD */
1081 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
1082 		dev_qp = calloc(1, sizeof(*dev_qp));
1083 		if (!dev_qp) {
1084 			rc = -ENOMEM;
1085 			goto err_qp_alloc;
1086 		}
1087 		dev_qp->device = device;
1088 		dev_qp->qp = j;
1089 		dev_qp->in_use = false;
1090 		TAILQ_INSERT_TAIL(&device->qpairs, dev_qp, link);
1091 		if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT) {
1092 			dev_qp->index = g_qat_total_qp++;
1093 		}
1094 	}
1095 	/* Add to our list of available crypto devices. */
1096 	TAILQ_INSERT_TAIL(&g_crypto_devices, device, link);
1097 
1098 	return 0;
1099 
1100 err_qp_alloc:
1101 	TAILQ_FOREACH(dev_qp, &device->qpairs, link) {
1102 		if (dev_qp->device->cdev_id != device->cdev_id) {
1103 			continue;
1104 		}
1105 		free(dev_qp);
1106 		if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT) {
1107 			assert(g_qat_total_qp);
1108 			g_qat_total_qp--;
1109 		}
1110 	}
1111 	rte_cryptodev_stop(cdev_id);
1112 err_dev_start:
1113 err_qp_setup:
1114 	rte_cryptodev_close(cdev_id);
1115 err:
1116 	free(device);
1117 
1118 	return rc;
1119 }
1120 
1121 static void
1122 accel_dpdk_cryptodev_release(struct accel_dpdk_cryptodev_device *device)
1123 {
1124 	struct accel_dpdk_cryptodev_qp *dev_qp, *tmp;
1125 
1126 	assert(device);
1127 
1128 	TAILQ_FOREACH_SAFE(dev_qp, &device->qpairs, link, tmp) {
1129 		free(dev_qp);
1130 	}
1131 	if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT) {
1132 		assert(g_qat_total_qp >= device->cdev_info.max_nb_queue_pairs);
1133 		g_qat_total_qp -= device->cdev_info.max_nb_queue_pairs;
1134 	}
1135 	rte_cryptodev_stop(device->cdev_id);
1136 	rte_cryptodev_close(device->cdev_id);
1137 	free(device);
1138 }
1139 
1140 static int
1141 accel_dpdk_cryptodev_init(void)
1142 {
1143 	uint8_t cdev_count;
1144 	uint8_t cdev_id;
1145 	int i, rc;
1146 	struct accel_dpdk_cryptodev_device *device, *tmp_dev;
1147 	unsigned int max_sess_size = 0, sess_size;
1148 	uint16_t num_lcores = rte_lcore_count();
1149 	char aesni_args[32];
1150 
1151 	/* Only the first call via module init should init the crypto drivers. */
1152 	if (g_session_mp != NULL) {
1153 		return 0;
1154 	}
1155 
1156 	/* We always init ACCEL_DPDK_CRYPTODEV_AESNI_MB */
1157 	snprintf(aesni_args, sizeof(aesni_args), "max_nb_queue_pairs=%d",
1158 		 ACCEL_DPDK_CRYPTODEV_AESNI_MB_NUM_QP);
1159 	rc = rte_vdev_init(ACCEL_DPDK_CRYPTODEV_AESNI_MB, aesni_args);
1160 	if (rc) {
1161 		SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. "
1162 			       "Possibly %s is not supported by DPDK library. "
1163 			       "Keep going...\n", ACCEL_DPDK_CRYPTODEV_AESNI_MB, rc, ACCEL_DPDK_CRYPTODEV_AESNI_MB);
1164 	}
1165 
1166 	/* If we have no crypto devices, there's no reason to continue. */
1167 	cdev_count = rte_cryptodev_count();
1168 	SPDK_NOTICELOG("Found crypto devices: %d\n", (int)cdev_count);
1169 	if (cdev_count == 0) {
1170 		return 0;
1171 	}
1172 
1173 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
1174 	if (g_mbuf_offset < 0) {
1175 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
1176 		return -EINVAL;
1177 	}
1178 
1179 	/* Create global mempools, shared by all devices regardless of type */
1180 	/* First determine max session size, most pools are shared by all the devices,
1181 	 * so we need to find the global max sessions size. */
1182 	for (cdev_id = 0; cdev_id < cdev_count; cdev_id++) {
1183 		sess_size = rte_cryptodev_sym_get_private_session_size(cdev_id);
1184 		if (sess_size > max_sess_size) {
1185 			max_sess_size = sess_size;
1186 		}
1187 	}
1188 
1189 #if RTE_VERSION < RTE_VERSION_NUM(22, 11, 0, 0)
1190 	g_session_mp_priv = rte_mempool_create("dpdk_crypto_ses_mp_priv",
1191 					       ACCEL_DPDK_CRYPTODEV_NUM_SESSIONS, max_sess_size, ACCEL_DPDK_CRYPTODEV_SESS_MEMPOOL_CACHE_SIZE, 0,
1192 					       NULL, NULL, NULL, NULL, SOCKET_ID_ANY, 0);
1193 	if (g_session_mp_priv == NULL) {
1194 		SPDK_ERRLOG("Cannot create private session pool max size 0x%x\n", max_sess_size);
1195 		return -ENOMEM;
1196 	}
1197 
1198 	/* When session private data mempool allocated, the element size for the session mempool
1199 	 * should be 0. */
1200 	max_sess_size = 0;
1201 #endif
1202 
1203 	g_session_mp = rte_cryptodev_sym_session_pool_create("dpdk_crypto_ses_mp",
1204 			ACCEL_DPDK_CRYPTODEV_NUM_SESSIONS, max_sess_size, ACCEL_DPDK_CRYPTODEV_SESS_MEMPOOL_CACHE_SIZE, 0,
1205 			SOCKET_ID_ANY);
1206 	if (g_session_mp == NULL) {
1207 		SPDK_ERRLOG("Cannot create session pool max size 0x%x\n", max_sess_size);
1208 		rc = -ENOMEM;
1209 		goto error_create_session_mp;
1210 	}
1211 
1212 	g_mbuf_mp = rte_pktmbuf_pool_create("dpdk_crypto_mbuf_mp", ACCEL_DPDK_CRYPTODEV_NUM_MBUFS,
1213 					    ACCEL_DPDK_CRYPTODEV_POOL_CACHE_SIZE,
1214 					    0, 0, SPDK_ENV_SOCKET_ID_ANY);
1215 	if (g_mbuf_mp == NULL) {
1216 		SPDK_ERRLOG("Cannot create mbuf pool\n");
1217 		rc = -ENOMEM;
1218 		goto error_create_mbuf;
1219 	}
1220 
1221 	/* We use per op private data as suggested by DPDK and to store the IV and
1222 	 * our own struct for queueing ops. */
1223 	g_crypto_op_mp = rte_crypto_op_pool_create("dpdk_crypto_op_mp",
1224 			 RTE_CRYPTO_OP_TYPE_SYMMETRIC, ACCEL_DPDK_CRYPTODEV_NUM_MBUFS, ACCEL_DPDK_CRYPTODEV_POOL_CACHE_SIZE,
1225 			 (ACCEL_DPDK_CRYPTODEV_DEFAULT_NUM_XFORMS * sizeof(struct rte_crypto_sym_xform)) +
1226 			 ACCEL_DPDK_CRYPTODEV_IV_LENGTH, rte_socket_id());
1227 	if (g_crypto_op_mp == NULL) {
1228 		SPDK_ERRLOG("Cannot create op pool\n");
1229 		rc = -ENOMEM;
1230 		goto error_create_op;
1231 	}
1232 
1233 	/* Init all devices */
1234 	for (i = 0; i < cdev_count; i++) {
1235 		rc = accel_dpdk_cryptodev_create(i, num_lcores);
1236 		if (rc) {
1237 			goto err;
1238 		}
1239 	}
1240 
1241 	g_shinfo.free_cb = shinfo_free_cb;
1242 
1243 	spdk_io_device_register(&g_accel_dpdk_cryptodev_module, _accel_dpdk_cryptodev_create_cb,
1244 				_accel_dpdk_cryptodev_destroy_cb, sizeof(struct accel_dpdk_cryptodev_io_channel),
1245 				"accel_dpdk_cryptodev");
1246 
1247 	return 0;
1248 
1249 	/* Error cleanup paths. */
1250 err:
1251 	TAILQ_FOREACH_SAFE(device, &g_crypto_devices, link, tmp_dev) {
1252 		TAILQ_REMOVE(&g_crypto_devices, device, link);
1253 		accel_dpdk_cryptodev_release(device);
1254 	}
1255 	rte_mempool_free(g_crypto_op_mp);
1256 	g_crypto_op_mp = NULL;
1257 error_create_op:
1258 	rte_mempool_free(g_mbuf_mp);
1259 	g_mbuf_mp = NULL;
1260 error_create_mbuf:
1261 	rte_mempool_free(g_session_mp);
1262 	g_session_mp = NULL;
1263 error_create_session_mp:
1264 	if (g_session_mp_priv != NULL) {
1265 		rte_mempool_free(g_session_mp_priv);
1266 		g_session_mp_priv = NULL;
1267 	}
1268 	return rc;
1269 }
1270 
1271 static void
1272 accel_dpdk_cryptodev_fini_cb(void *io_device)
1273 {
1274 	struct accel_dpdk_cryptodev_device *device, *tmp;
1275 
1276 	TAILQ_FOREACH_SAFE(device, &g_crypto_devices, link, tmp) {
1277 		TAILQ_REMOVE(&g_crypto_devices, device, link);
1278 		accel_dpdk_cryptodev_release(device);
1279 	}
1280 	rte_vdev_uninit(ACCEL_DPDK_CRYPTODEV_AESNI_MB);
1281 
1282 	rte_mempool_free(g_crypto_op_mp);
1283 	rte_mempool_free(g_mbuf_mp);
1284 	rte_mempool_free(g_session_mp);
1285 	if (g_session_mp_priv != NULL) {
1286 		rte_mempool_free(g_session_mp_priv);
1287 	}
1288 
1289 	spdk_accel_module_finish();
1290 }
1291 
1292 /* Called when the entire module is being torn down. */
1293 static void
1294 accel_dpdk_cryptodev_fini(void *ctx)
1295 {
1296 	if (g_crypto_op_mp) {
1297 		spdk_io_device_unregister(&g_accel_dpdk_cryptodev_module, accel_dpdk_cryptodev_fini_cb);
1298 	}
1299 }
1300 
1301 static void
1302 accel_dpdk_cryptodev_key_handle_session_free(struct accel_dpdk_cryptodev_device *device,
1303 		void *session)
1304 {
1305 #if RTE_VERSION >= RTE_VERSION_NUM(22, 11, 0, 0)
1306 	assert(device != NULL);
1307 
1308 	rte_cryptodev_sym_session_free(device->cdev_id, session);
1309 #else
1310 	rte_cryptodev_sym_session_free(session);
1311 #endif
1312 }
1313 
1314 static void *
1315 accel_dpdk_cryptodev_key_handle_session_create(struct accel_dpdk_cryptodev_device *device,
1316 		struct rte_crypto_sym_xform *cipher_xform)
1317 {
1318 	void *session;
1319 
1320 #if RTE_VERSION >= RTE_VERSION_NUM(22, 11, 0, 0)
1321 	session = rte_cryptodev_sym_session_create(device->cdev_id, cipher_xform, g_session_mp);
1322 #else
1323 	session = rte_cryptodev_sym_session_create(g_session_mp);
1324 	if (!session) {
1325 		return NULL;
1326 	}
1327 
1328 	if (rte_cryptodev_sym_session_init(device->cdev_id, session, cipher_xform, g_session_mp_priv) < 0) {
1329 		accel_dpdk_cryptodev_key_handle_session_free(device, session);
1330 		return NULL;
1331 	}
1332 #endif
1333 
1334 	return session;
1335 }
1336 
1337 static int
1338 accel_dpdk_cryptodev_key_handle_configure(struct spdk_accel_crypto_key *key,
1339 		struct accel_dpdk_cryptodev_key_handle *key_handle)
1340 {
1341 	struct accel_dpdk_cryptodev_key_priv *priv = key->priv;
1342 
1343 	key_handle->cipher_xform.type = RTE_CRYPTO_SYM_XFORM_CIPHER;
1344 	key_handle->cipher_xform.cipher.iv.offset = ACCEL_DPDK_CRYPTODEV_IV_OFFSET;
1345 	key_handle->cipher_xform.cipher.iv.length = ACCEL_DPDK_CRYPTODEV_IV_LENGTH;
1346 
1347 	switch (priv->cipher) {
1348 	case ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC:
1349 		key_handle->cipher_xform.cipher.key.data = key->key;
1350 		key_handle->cipher_xform.cipher.key.length = key->key_size;
1351 		key_handle->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_CBC;
1352 		break;
1353 	case ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS:
1354 		key_handle->cipher_xform.cipher.key.data = priv->xts_key;
1355 		key_handle->cipher_xform.cipher.key.length = key->key_size + key->key2_size;
1356 		key_handle->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_XTS;
1357 		break;
1358 	default:
1359 		SPDK_ERRLOG("Invalid cipher name %s.\n", key->param.cipher);
1360 		return -EINVAL;
1361 	}
1362 
1363 	key_handle->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_ENCRYPT;
1364 	key_handle->session_encrypt = accel_dpdk_cryptodev_key_handle_session_create(key_handle->device,
1365 				      &key_handle->cipher_xform);
1366 	if (!key_handle->session_encrypt) {
1367 		SPDK_ERRLOG("Failed to init encrypt session\n");
1368 		return -EINVAL;
1369 	}
1370 
1371 	key_handle->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_DECRYPT;
1372 	key_handle->session_decrypt = accel_dpdk_cryptodev_key_handle_session_create(key_handle->device,
1373 				      &key_handle->cipher_xform);
1374 	if (!key_handle->session_decrypt) {
1375 		SPDK_ERRLOG("Failed to init decrypt session:");
1376 		accel_dpdk_cryptodev_key_handle_session_free(key_handle->device, key_handle->session_encrypt);
1377 		return -EINVAL;
1378 	}
1379 
1380 	return 0;
1381 }
1382 
1383 static int
1384 accel_dpdk_cryptodev_validate_parameters(enum accel_dpdk_cryptodev_driver_type driver,
1385 		enum accel_dpdk_crypto_dev_cipher_type cipher, struct spdk_accel_crypto_key *key)
1386 {
1387 	/* Check that all required parameters exist */
1388 	switch (cipher) {
1389 	case ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC:
1390 		if (!key->key || !key->key_size) {
1391 			SPDK_ERRLOG("ACCEL_DPDK_CRYPTODEV_AES_CBC requires a key\n");
1392 			return -1;
1393 		}
1394 		if (key->key2 || key->key2_size) {
1395 			SPDK_ERRLOG("ACCEL_DPDK_CRYPTODEV_AES_CBC doesn't use key2\n");
1396 			return -1;
1397 		}
1398 		break;
1399 	case ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS:
1400 		if (!key->key || !key->key_size || !key->key2 || !key->key2_size) {
1401 			SPDK_ERRLOG("ACCEL_DPDK_CRYPTODEV_AES_XTS requires both key and key2\n");
1402 			return -1;
1403 		}
1404 		break;
1405 	default:
1406 		return -1;
1407 	}
1408 
1409 	/* Check driver/cipher combinations and key lengths */
1410 	switch (cipher) {
1411 	case ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC:
1412 		if (driver == ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI) {
1413 			SPDK_ERRLOG("Driver %s only supports cipher %s\n",
1414 				    g_driver_names[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI],
1415 				    g_cipher_names[ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS]);
1416 			return -1;
1417 		}
1418 		if (key->key_size != ACCEL_DPDK_CRYPTODEV_AES_CBC_KEY_LENGTH) {
1419 			SPDK_ERRLOG("Invalid key size %zu for cipher %s, should be %d\n", key->key_size,
1420 				    g_cipher_names[ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC], ACCEL_DPDK_CRYPTODEV_AES_CBC_KEY_LENGTH);
1421 			return -1;
1422 		}
1423 		break;
1424 	case ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS:
1425 		switch (driver) {
1426 		case ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI:
1427 			if (key->key_size != ACCEL_DPDK_CRYPTODEV_AES_XTS_256_BLOCK_KEY_LENGTH &&
1428 			    key->key_size != ACCEL_DPDK_CRYPTODEV_AES_XTS_512_BLOCK_KEY_LENGTH) {
1429 				SPDK_ERRLOG("Invalid key size %zu for driver %s, cipher %s, supported %d or %d\n",
1430 					    key->key_size, g_driver_names[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI],
1431 					    g_cipher_names[ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS],
1432 					    ACCEL_DPDK_CRYPTODEV_AES_XTS_256_BLOCK_KEY_LENGTH,
1433 					    ACCEL_DPDK_CRYPTODEV_AES_XTS_512_BLOCK_KEY_LENGTH);
1434 				return -1;
1435 			}
1436 			break;
1437 		case ACCEL_DPDK_CRYPTODEV_DRIVER_QAT:
1438 		case ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB:
1439 			if (key->key_size != ACCEL_DPDK_CRYPTODEV_AES_XTS_128_BLOCK_KEY_LENGTH) {
1440 				SPDK_ERRLOG("Invalid key size %zu, supported %d\n", key->key_size,
1441 					    ACCEL_DPDK_CRYPTODEV_AES_XTS_128_BLOCK_KEY_LENGTH);
1442 				return -1;
1443 			}
1444 			break;
1445 		default:
1446 			SPDK_ERRLOG("Incorrect driver type %d\n", driver);
1447 			assert(0);
1448 			return -1;
1449 		}
1450 		if (key->key2_size != ACCEL_DPDK_CRYPTODEV_AES_XTS_TWEAK_KEY_LENGTH) {
1451 			SPDK_ERRLOG("Cipher %s requires key2 size %d\n",
1452 				    g_cipher_names[ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC], ACCEL_DPDK_CRYPTODEV_AES_XTS_TWEAK_KEY_LENGTH);
1453 			return -1;
1454 		}
1455 		break;
1456 	}
1457 
1458 	return 0;
1459 }
1460 
1461 static void
1462 accel_dpdk_cryptodev_key_deinit(struct spdk_accel_crypto_key *key)
1463 {
1464 	struct accel_dpdk_cryptodev_key_handle *key_handle, *key_handle_tmp;
1465 	struct accel_dpdk_cryptodev_key_priv *priv = key->priv;
1466 
1467 	TAILQ_FOREACH_SAFE(key_handle, &priv->dev_keys, link, key_handle_tmp) {
1468 		accel_dpdk_cryptodev_key_handle_session_free(key_handle->device, key_handle->session_encrypt);
1469 		accel_dpdk_cryptodev_key_handle_session_free(key_handle->device, key_handle->session_decrypt);
1470 		TAILQ_REMOVE(&priv->dev_keys, key_handle, link);
1471 		spdk_memset_s(key_handle, sizeof(*key_handle), 0, sizeof(*key_handle));
1472 		free(key_handle);
1473 	}
1474 
1475 	if (priv->xts_key) {
1476 		spdk_memset_s(priv->xts_key, key->key_size + key->key2_size, 0, key->key_size + key->key2_size);
1477 	}
1478 	free(priv->xts_key);
1479 	free(priv);
1480 }
1481 
1482 static int
1483 accel_dpdk_cryptodev_key_init(struct spdk_accel_crypto_key *key)
1484 {
1485 	struct accel_dpdk_cryptodev_device *device;
1486 	struct accel_dpdk_cryptodev_key_priv *priv;
1487 	struct accel_dpdk_cryptodev_key_handle *key_handle;
1488 	enum accel_dpdk_cryptodev_driver_type driver;
1489 	enum accel_dpdk_crypto_dev_cipher_type cipher;
1490 	int rc;
1491 
1492 	if (!key->param.cipher) {
1493 		SPDK_ERRLOG("Cipher is missing\n");
1494 		return -EINVAL;
1495 	}
1496 
1497 	if (strcmp(key->param.cipher, ACCEL_DPDK_CRYPTODEV_AES_CBC) == 0) {
1498 		cipher = ACCEL_DPDK_CRYPTODEV_CIPHER_AES_CBC;
1499 	} else if (strcmp(key->param.cipher, ACCEL_DPDK_CRYPTODEV_AES_XTS) == 0) {
1500 		cipher = ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS;
1501 	} else {
1502 		SPDK_ERRLOG("Unsupported cipher name %s.\n", key->param.cipher);
1503 		return -EINVAL;
1504 	}
1505 
1506 	driver = g_dpdk_cryptodev_driver;
1507 
1508 	if (accel_dpdk_cryptodev_validate_parameters(driver, cipher, key)) {
1509 		return -EINVAL;
1510 	}
1511 
1512 	priv = calloc(1, sizeof(*priv));
1513 	if (!priv) {
1514 		SPDK_ERRLOG("Memory allocation failed\n");
1515 		return -ENOMEM;
1516 	}
1517 	key->priv = priv;
1518 	priv->driver = driver;
1519 	priv->cipher = cipher;
1520 	TAILQ_INIT(&priv->dev_keys);
1521 
1522 	if (cipher == ACCEL_DPDK_CRYPTODEV_CIPHER_AES_XTS) {
1523 		/* DPDK expects the keys to be concatenated together. */
1524 		priv->xts_key = calloc(key->key_size + key->key2_size + 1, sizeof(char));
1525 		if (!priv->xts_key) {
1526 			SPDK_ERRLOG("Memory allocation failed\n");
1527 			accel_dpdk_cryptodev_key_deinit(key);
1528 			return -ENOMEM;
1529 		}
1530 		memcpy(priv->xts_key, key->key, key->key_size);
1531 		memcpy(priv->xts_key + key->key_size, key->key2, key->key2_size);
1532 	}
1533 
1534 	pthread_mutex_lock(&g_device_lock);
1535 	TAILQ_FOREACH(device, &g_crypto_devices, link) {
1536 		if (device->type != driver) {
1537 			continue;
1538 		}
1539 		key_handle = calloc(1, sizeof(*key_handle));
1540 		if (!key_handle) {
1541 			pthread_mutex_unlock(&g_device_lock);
1542 			accel_dpdk_cryptodev_key_deinit(key);
1543 			return -ENOMEM;
1544 		}
1545 		key_handle->device = device;
1546 		TAILQ_INSERT_TAIL(&priv->dev_keys, key_handle, link);
1547 		rc = accel_dpdk_cryptodev_key_handle_configure(key, key_handle);
1548 		if (rc) {
1549 			pthread_mutex_unlock(&g_device_lock);
1550 			accel_dpdk_cryptodev_key_deinit(key);
1551 			return rc;
1552 		}
1553 		if (driver != ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI) {
1554 			/* For MLX5_PCI we need to register a key on each device since
1555 			 * the key is bound to a specific Protection Domain,
1556 			 * so don't break the loop */
1557 			break;
1558 		}
1559 	}
1560 	pthread_mutex_unlock(&g_device_lock);
1561 
1562 	if (TAILQ_EMPTY(&priv->dev_keys)) {
1563 		free(priv);
1564 		return -ENODEV;
1565 	}
1566 
1567 	return 0;
1568 }
1569 
1570 static void
1571 accel_dpdk_cryptodev_write_config_json(struct spdk_json_write_ctx *w)
1572 {
1573 	spdk_json_write_object_begin(w);
1574 	spdk_json_write_named_string(w, "method", "dpdk_cryptodev_scan_accel_module");
1575 	spdk_json_write_object_end(w);
1576 
1577 	spdk_json_write_object_begin(w);
1578 	spdk_json_write_named_string(w, "method", "dpdk_cryptodev_set_driver");
1579 	spdk_json_write_named_object_begin(w, "params");
1580 	spdk_json_write_named_string(w, "driver_name", g_driver_names[g_dpdk_cryptodev_driver]);
1581 	spdk_json_write_object_end(w);
1582 	spdk_json_write_object_end(w);
1583 }
1584 
1585 static struct spdk_accel_module_if g_accel_dpdk_cryptodev_module = {
1586 	.module_init		= accel_dpdk_cryptodev_init,
1587 	.module_fini		= accel_dpdk_cryptodev_fini,
1588 	.write_config_json	= accel_dpdk_cryptodev_write_config_json,
1589 	.get_ctx_size		= accel_dpdk_cryptodev_ctx_size,
1590 	.name			= "dpdk_cryptodev",
1591 	.supports_opcode	= accel_dpdk_cryptodev_supports_opcode,
1592 	.get_io_channel		= accel_dpdk_cryptodev_get_io_channel,
1593 	.submit_tasks		= accel_dpdk_cryptodev_submit_tasks,
1594 	.crypto_key_init	= accel_dpdk_cryptodev_key_init,
1595 	.crypto_key_deinit	= accel_dpdk_cryptodev_key_deinit,
1596 };
1597