xref: /spdk/module/bdev/crypto/vbdev_crypto.c (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES.
7  *   All rights reserved.
8  *
9  *   Redistribution and use in source and binary forms, with or without
10  *   modification, are permitted provided that the following conditions
11  *   are met:
12  *
13  *     * Redistributions of source code must retain the above copyright
14  *       notice, this list of conditions and the following disclaimer.
15  *     * Redistributions in binary form must reproduce the above copyright
16  *       notice, this list of conditions and the following disclaimer in
17  *       the documentation and/or other materials provided with the
18  *       distribution.
19  *     * Neither the name of Intel Corporation nor the names of its
20  *       contributors may be used to endorse or promote products derived
21  *       from this software without specific prior written permission.
22  *
23  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
29  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30  *   DATA, OR PROFITS; OR BUSINESS INTERRUcryptoION) HOWEVER CAUSED AND ON ANY
31  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
33  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include "vbdev_crypto.h"
37 
38 #include "spdk/env.h"
39 #include "spdk/likely.h"
40 #include "spdk/endian.h"
41 #include "spdk/thread.h"
42 #include "spdk/bdev_module.h"
43 #include "spdk/log.h"
44 
45 #include <rte_config.h>
46 #include <rte_bus_vdev.h>
47 #include <rte_crypto.h>
48 #include <rte_cryptodev.h>
49 #include <rte_mbuf_dyn.h>
50 
51 /* Used to store IO context in mbuf */
52 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
53 	.name = "context_bdev_io",
54 	.size = sizeof(uint64_t),
55 	.align = __alignof__(uint64_t),
56 	.flags = 0,
57 };
58 static int g_mbuf_offset;
59 
60 /* To add support for new device types, follow the examples of the following...
61  * Note that the string names are defined by the DPDK PMD in question so be
62  * sure to use the exact names.
63  */
64 #define MAX_NUM_DRV_TYPES 3
65 
66 /* The VF spread is the number of queue pairs between virtual functions, we use this to
67  * load balance the QAT device.
68  */
69 #define QAT_VF_SPREAD 32
70 static uint8_t g_qat_total_qp = 0;
71 static uint8_t g_next_qat_index;
72 
73 const char *g_driver_names[MAX_NUM_DRV_TYPES] = { AESNI_MB, QAT, MLX5 };
74 
75 /* Global list of available crypto devices. */
76 struct vbdev_dev {
77 	struct rte_cryptodev_info	cdev_info;	/* includes device friendly name */
78 	uint8_t				cdev_id;	/* identifier for the device */
79 	TAILQ_ENTRY(vbdev_dev)		link;
80 };
81 static TAILQ_HEAD(, vbdev_dev) g_vbdev_devs = TAILQ_HEAD_INITIALIZER(g_vbdev_devs);
82 
83 /* Global list and lock for unique device/queue pair combos. We keep 1 list per supported PMD
84  * so that we can optimize per PMD where it make sense. For example, with QAT there an optimal
85  * pattern for assigning queue pairs where with AESNI there is not.
86  */
87 struct device_qp {
88 	struct vbdev_dev		*device;	/* ptr to crypto device */
89 	uint8_t				qp;		/* queue pair for this node */
90 	bool				in_use;		/* whether this node is in use or not */
91 	uint8_t				index;		/* used by QAT to load balance placement of qpairs */
92 	TAILQ_ENTRY(device_qp)		link;
93 };
94 static TAILQ_HEAD(, device_qp) g_device_qp_qat = TAILQ_HEAD_INITIALIZER(g_device_qp_qat);
95 static TAILQ_HEAD(, device_qp) g_device_qp_aesni_mb = TAILQ_HEAD_INITIALIZER(g_device_qp_aesni_mb);
96 static TAILQ_HEAD(, device_qp) g_device_qp_mlx5 = TAILQ_HEAD_INITIALIZER(g_device_qp_mlx5);
97 static pthread_mutex_t g_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
98 
99 
100 /* In order to limit the number of resources we need to do one crypto
101  * operation per LBA (we use LBA as IV), we tell the bdev layer that
102  * our max IO size is something reasonable. Units here are in bytes.
103  */
104 #define CRYPTO_MAX_IO		(64 * 1024)
105 
106 /* This controls how many ops will be dequeued from the crypto driver in one run
107  * of the poller. It is mainly a performance knob as it effectively determines how
108  * much work the poller has to do.  However even that can vary between crypto drivers
109  * as the AESNI_MB driver for example does all the crypto work on dequeue whereas the
110  * QAT driver just dequeues what has been completed already.
111  */
112 #define MAX_DEQUEUE_BURST_SIZE	64
113 
114 /* When enqueueing, we need to supply the crypto driver with an array of pointers to
115  * operation structs. As each of these can be max 512B, we can adjust the CRYPTO_MAX_IO
116  * value in conjunction with the other defines to make sure we're not using crazy amounts
117  * of memory. All of these numbers can and probably should be adjusted based on the
118  * workload. By default we'll use the worst case (smallest) block size for the
119  * minimum number of array entries. As an example, a CRYPTO_MAX_IO size of 64K with 512B
120  * blocks would give us an enqueue array size of 128.
121  */
122 #define MAX_ENQUEUE_ARRAY_SIZE (CRYPTO_MAX_IO / 512)
123 
124 /* The number of MBUFS we need must be a power of two and to support other small IOs
125  * in addition to the limits mentioned above, we go to the next power of two. It is
126  * big number because it is one mempool for source and destination mbufs. It may
127  * need to be bigger to support multiple crypto drivers at once.
128  */
129 #define NUM_MBUFS		32768
130 #define POOL_CACHE_SIZE		256
131 #define MAX_CRYPTO_VOLUMES	128
132 #define NUM_SESSIONS		(2 * MAX_CRYPTO_VOLUMES)
133 #define SESS_MEMPOOL_CACHE_SIZE 0
134 uint8_t g_number_of_claimed_volumes = 0;
135 
136 /* This is the max number of IOs we can supply to any crypto device QP at one time.
137  * It can vary between drivers.
138  */
139 #define CRYPTO_QP_DESCRIPTORS	2048
140 
141 /* At this moment DPDK descriptors allocation for mlx5 has some issues. We use 512
142  * as an compromise value between performance and the time spent for initialization. */
143 #define CRYPTO_QP_DESCRIPTORS_MLX5	512
144 
145 #define AESNI_MB_NUM_QP		64
146 
147 /* Common for suported devices. */
148 #define DEFAULT_NUM_XFORMS           2
149 #define IV_OFFSET (sizeof(struct rte_crypto_op) + \
150                 sizeof(struct rte_crypto_sym_op) + \
151                 (DEFAULT_NUM_XFORMS * \
152                  sizeof(struct rte_crypto_sym_xform)))
153 #define IV_LENGTH		     16
154 #define QUEUED_OP_OFFSET (IV_OFFSET + IV_LENGTH)
155 
156 static void _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
157 static void _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
158 static void _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
159 static void vbdev_crypto_examine(struct spdk_bdev *bdev);
160 static int vbdev_crypto_claim(const char *bdev_name);
161 static void vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
162 
163 struct bdev_names {
164 	struct vbdev_crypto_opts	*opts;
165 	TAILQ_ENTRY(bdev_names)		link;
166 };
167 
168 /* List of crypto_bdev names and their base bdevs via configuration file. */
169 static TAILQ_HEAD(, bdev_names) g_bdev_names = TAILQ_HEAD_INITIALIZER(g_bdev_names);
170 
171 struct vbdev_crypto {
172 	struct spdk_bdev		*base_bdev;		/* the thing we're attaching to */
173 	struct spdk_bdev_desc		*base_desc;		/* its descriptor we get from open */
174 	struct spdk_bdev		crypto_bdev;		/* the crypto virtual bdev */
175 	struct vbdev_crypto_opts	*opts;			/* crypto options such as key, cipher */
176 	uint32_t			qp_desc_nr;             /* number of qp descriptors */
177 	struct rte_cryptodev_sym_session *session_encrypt;	/* encryption session for this bdev */
178 	struct rte_cryptodev_sym_session *session_decrypt;	/* decryption session for this bdev */
179 	struct rte_crypto_sym_xform	cipher_xform;		/* crypto control struct for this bdev */
180 	TAILQ_ENTRY(vbdev_crypto)	link;
181 	struct spdk_thread		*thread;		/* thread where base device is opened */
182 };
183 
184 /* List of virtual bdevs and associated info for each. We keep the device friendly name here even
185  * though its also in the device struct because we use it early on.
186  */
187 static TAILQ_HEAD(, vbdev_crypto) g_vbdev_crypto = TAILQ_HEAD_INITIALIZER(g_vbdev_crypto);
188 
189 /* Shared mempools between all devices on this system */
190 static struct rte_mempool *g_session_mp = NULL;
191 static struct rte_mempool *g_session_mp_priv = NULL;
192 static struct rte_mempool *g_mbuf_mp = NULL;            /* mbuf mempool */
193 static struct rte_mempool *g_crypto_op_mp = NULL;	/* crypto operations, must be rte* mempool */
194 
195 static struct rte_mbuf_ext_shared_info g_shinfo = {};   /* used by DPDK mbuf macro */
196 
197 /* For queueing up crypto operations that we can't submit for some reason */
198 struct vbdev_crypto_op {
199 	uint8_t					cdev_id;
200 	uint8_t					qp;
201 	struct rte_crypto_op			*crypto_op;
202 	struct spdk_bdev_io			*bdev_io;
203 	TAILQ_ENTRY(vbdev_crypto_op)		link;
204 };
205 #define QUEUED_OP_LENGTH (sizeof(struct vbdev_crypto_op))
206 
207 /* The crypto vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
208  * We store things in here that are needed on per thread basis like the base_channel for this thread,
209  * and the poller for this thread.
210  */
211 struct crypto_io_channel {
212 	struct spdk_io_channel		*base_ch;		/* IO channel of base device */
213 	struct spdk_poller		*poller;		/* completion poller */
214 	struct device_qp		*device_qp;		/* unique device/qp combination for this channel */
215 	TAILQ_HEAD(, spdk_bdev_io)	pending_cry_ios;	/* outstanding operations to the crypto device */
216 	struct spdk_io_channel_iter	*iter;			/* used with for_each_channel in reset */
217 	TAILQ_HEAD(, vbdev_crypto_op)	queued_cry_ops;		/* queued for re-submission to CryptoDev */
218 };
219 
220 /* This is the crypto per IO context that the bdev layer allocates for us opaquely and attaches to
221  * each IO for us.
222  */
223 struct crypto_bdev_io {
224 	int cryop_cnt_remaining;			/* counter used when completing crypto ops */
225 	struct crypto_io_channel *crypto_ch;		/* need to store for crypto completion handling */
226 	struct vbdev_crypto *crypto_bdev;		/* the crypto node struct associated with this IO */
227 	struct spdk_bdev_io *orig_io;			/* the original IO */
228 	struct spdk_bdev_io *read_io;			/* the read IO we issued */
229 	int8_t bdev_io_status;				/* the status we'll report back on the bdev IO */
230 	bool on_pending_list;
231 	/* Used for the single contiguous buffer that serves as the crypto destination target for writes */
232 	uint64_t aux_num_blocks;			/* num of blocks for the contiguous buffer */
233 	uint64_t aux_offset_blocks;			/* block offset on media */
234 	void *aux_buf_raw;				/* raw buffer that the bdev layer gave us for write buffer */
235 	struct iovec aux_buf_iov;			/* iov representing aligned contig write buffer */
236 
237 	/* for bdev_io_wait */
238 	struct spdk_bdev_io_wait_entry bdev_io_wait;
239 	struct spdk_io_channel *ch;
240 };
241 
242 /* Called by vbdev_crypto_init_crypto_drivers() to init each discovered crypto device */
243 static int
244 create_vbdev_dev(uint8_t index, uint16_t num_lcores)
245 {
246 	struct vbdev_dev *device;
247 	uint8_t j, cdev_id, cdrv_id;
248 	struct device_qp *dev_qp;
249 	struct device_qp *tmp_qp;
250 	uint32_t qp_desc_nr;
251 	int rc;
252 	TAILQ_HEAD(device_qps, device_qp) *dev_qp_head;
253 
254 	device = calloc(1, sizeof(struct vbdev_dev));
255 	if (!device) {
256 		return -ENOMEM;
257 	}
258 
259 	/* Get details about this device. */
260 	rte_cryptodev_info_get(index, &device->cdev_info);
261 	cdrv_id = device->cdev_info.driver_id;
262 	cdev_id = device->cdev_id = index;
263 
264 	/* QAT_ASYM devices are not supported at this time. */
265 	if (strcmp(device->cdev_info.driver_name, QAT_ASYM) == 0) {
266 		free(device);
267 		return 0;
268 	}
269 
270 	/* Before going any further, make sure we have enough resources for this
271 	 * device type to function.  We need a unique queue pair per core accross each
272 	 * device type to remain lockless....
273 	 */
274 	if ((rte_cryptodev_device_count_by_driver(cdrv_id) *
275 	     device->cdev_info.max_nb_queue_pairs) < num_lcores) {
276 		SPDK_ERRLOG("Insufficient unique queue pairs available for %s\n",
277 			    device->cdev_info.driver_name);
278 		SPDK_ERRLOG("Either add more crypto devices or decrease core count\n");
279 		rc = -EINVAL;
280 		goto err;
281 	}
282 
283 	/* Setup queue pairs. */
284 	struct rte_cryptodev_config conf = {
285 		.nb_queue_pairs = device->cdev_info.max_nb_queue_pairs,
286 		.socket_id = SPDK_ENV_SOCKET_ID_ANY
287 	};
288 
289 	rc = rte_cryptodev_configure(cdev_id, &conf);
290 	if (rc < 0) {
291 		SPDK_ERRLOG("Failed to configure cryptodev %u: error %d\n",
292 			    cdev_id, rc);
293 		rc = -EINVAL;
294 		goto err;
295 	}
296 
297 	/* Select the right device/qp list based on driver name
298 	 * or error if it does not exist.
299 	 */
300 	if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
301 		dev_qp_head = (struct device_qps *)&g_device_qp_qat;
302 		qp_desc_nr = CRYPTO_QP_DESCRIPTORS;
303 	} else if (strcmp(device->cdev_info.driver_name, AESNI_MB) == 0) {
304 		dev_qp_head = (struct device_qps *)&g_device_qp_aesni_mb;
305 		qp_desc_nr = CRYPTO_QP_DESCRIPTORS;
306 	} else if (strcmp(device->cdev_info.driver_name, MLX5) == 0) {
307 		dev_qp_head = (struct device_qps *)&g_device_qp_mlx5;
308 		qp_desc_nr = CRYPTO_QP_DESCRIPTORS_MLX5;
309 	} else {
310 		SPDK_ERRLOG("Failed to start device %u. Invalid driver name \"%s\"\n",
311 			    cdev_id, device->cdev_info.driver_name);
312 		rc = -EINVAL;
313 		goto err_qp_setup;
314 	}
315 
316 	struct rte_cryptodev_qp_conf qp_conf = {
317 		.nb_descriptors = qp_desc_nr,
318 		.mp_session = g_session_mp,
319 		.mp_session_private = g_session_mp_priv,
320 	};
321 
322 	/* Pre-setup all potential qpairs now and assign them in the channel
323 	 * callback. If we were to create them there, we'd have to stop the
324 	 * entire device affecting all other threads that might be using it
325 	 * even on other queue pairs.
326 	 */
327 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
328 		rc = rte_cryptodev_queue_pair_setup(cdev_id, j, &qp_conf, SOCKET_ID_ANY);
329 		if (rc < 0) {
330 			SPDK_ERRLOG("Failed to setup queue pair %u on "
331 				    "cryptodev %u: error %d\n", j, cdev_id, rc);
332 			rc = -EINVAL;
333 			goto err_qp_setup;
334 		}
335 	}
336 
337 	rc = rte_cryptodev_start(cdev_id);
338 	if (rc < 0) {
339 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
340 			    cdev_id, rc);
341 		rc = -EINVAL;
342 		goto err_dev_start;
343 	}
344 
345 	/* Build up lists of device/qp combinations per PMD */
346 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
347 		dev_qp = calloc(1, sizeof(struct device_qp));
348 		if (!dev_qp) {
349 			rc = -ENOMEM;
350 			goto err_qp_alloc;
351 		}
352 		dev_qp->device = device;
353 		dev_qp->qp = j;
354 		dev_qp->in_use = false;
355 		if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
356 			g_qat_total_qp++;
357 		}
358 		TAILQ_INSERT_TAIL(dev_qp_head, dev_qp, link);
359 	}
360 
361 	/* Add to our list of available crypto devices. */
362 	TAILQ_INSERT_TAIL(&g_vbdev_devs, device, link);
363 
364 	return 0;
365 err_qp_alloc:
366 	TAILQ_FOREACH_SAFE(dev_qp, dev_qp_head, link, tmp_qp) {
367 		if (dev_qp->device->cdev_id != device->cdev_id) {
368 			continue;
369 		}
370 		TAILQ_REMOVE(dev_qp_head, dev_qp, link);
371 		if (dev_qp_head == (struct device_qps *)&g_device_qp_qat) {
372 			g_qat_total_qp--;
373 		}
374 		free(dev_qp);
375 	}
376 	rte_cryptodev_stop(cdev_id);
377 err_dev_start:
378 err_qp_setup:
379 	rte_cryptodev_close(cdev_id);
380 err:
381 	free(device);
382 
383 	return rc;
384 }
385 
386 static void
387 release_vbdev_dev(struct vbdev_dev *device)
388 {
389 	struct device_qp *dev_qp;
390 	struct device_qp *tmp_qp;
391 	TAILQ_HEAD(device_qps, device_qp) *dev_qp_head = NULL;
392 
393 	assert(device);
394 
395 	/* Select the right device/qp list based on driver name. */
396 	if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
397 		dev_qp_head = (struct device_qps *)&g_device_qp_qat;
398 	} else if (strcmp(device->cdev_info.driver_name, AESNI_MB) == 0) {
399 		dev_qp_head = (struct device_qps *)&g_device_qp_aesni_mb;
400 	} else if (strcmp(device->cdev_info.driver_name, MLX5) == 0) {
401 		dev_qp_head = (struct device_qps *)&g_device_qp_mlx5;
402 	}
403 	if (dev_qp_head) {
404 		TAILQ_FOREACH_SAFE(dev_qp, dev_qp_head, link, tmp_qp) {
405 			/* Remove only qps of our device even if the driver names matches. */
406 			if (dev_qp->device->cdev_id != device->cdev_id) {
407 				continue;
408 			}
409 			TAILQ_REMOVE(dev_qp_head, dev_qp, link);
410 			if (dev_qp_head == (struct device_qps *)&g_device_qp_qat) {
411 				g_qat_total_qp--;
412 			}
413 			free(dev_qp);
414 		}
415 	}
416 	rte_cryptodev_stop(device->cdev_id);
417 	rte_cryptodev_close(device->cdev_id);
418 	free(device);
419 }
420 
421 /* Dummy function used by DPDK to free ext attached buffers to mbufs, we free them ourselves but
422  * this callback has to be here. */
423 static void shinfo_free_cb(void *arg1, void *arg2)
424 {
425 }
426 
427 /* This is called from the module's init function. We setup all crypto devices early on as we are unable
428  * to easily dynamically configure queue pairs after the drivers are up and running.  So, here, we
429  * configure the max capabilities of each device and assign threads to queue pairs as channels are
430  * requested.
431  */
432 static int
433 vbdev_crypto_init_crypto_drivers(void)
434 {
435 	uint8_t cdev_count;
436 	uint8_t cdev_id;
437 	int i, rc;
438 	struct vbdev_dev *device;
439 	struct vbdev_dev *tmp_dev;
440 	struct device_qp *dev_qp;
441 	unsigned int max_sess_size = 0, sess_size;
442 	uint16_t num_lcores = rte_lcore_count();
443 	char aesni_args[32];
444 
445 	/* Only the first call, via RPC or module init should init the crypto drivers. */
446 	if (g_session_mp != NULL) {
447 		return 0;
448 	}
449 
450 	/* We always init AESNI_MB */
451 	snprintf(aesni_args, sizeof(aesni_args), "max_nb_queue_pairs=%d", AESNI_MB_NUM_QP);
452 	rc = rte_vdev_init(AESNI_MB, aesni_args);
453 	if (rc) {
454 		SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. "
455 			       "Possibly %s is not supported by DPDK library. "
456 			       "Keep going...\n", AESNI_MB, rc, AESNI_MB);
457 	}
458 
459 	/* If we have no crypto devices, there's no reason to continue. */
460 	cdev_count = rte_cryptodev_count();
461 	SPDK_NOTICELOG("Found crypto devices: %d\n", (int)cdev_count);
462 	if (cdev_count == 0) {
463 		return 0;
464 	}
465 
466 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
467 	if (g_mbuf_offset < 0) {
468 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
469 		return -EINVAL;
470 	}
471 
472 	/*
473 	 * Create global mempools, shared by all devices regardless of type.
474 	 */
475 
476 	/* First determine max session size, most pools are shared by all the devices,
477 	 * so we need to find the global max sessions size.
478 	 */
479 	for (cdev_id = 0; cdev_id < cdev_count; cdev_id++) {
480 		sess_size = rte_cryptodev_sym_get_private_session_size(cdev_id);
481 		if (sess_size > max_sess_size) {
482 			max_sess_size = sess_size;
483 		}
484 	}
485 
486 	g_session_mp_priv = rte_mempool_create("session_mp_priv", NUM_SESSIONS, max_sess_size,
487 					       SESS_MEMPOOL_CACHE_SIZE, 0, NULL, NULL, NULL,
488 					       NULL, SOCKET_ID_ANY, 0);
489 	if (g_session_mp_priv == NULL) {
490 		SPDK_ERRLOG("Cannot create private session pool max size 0x%x\n", max_sess_size);
491 		return -ENOMEM;
492 	}
493 
494 	g_session_mp = rte_cryptodev_sym_session_pool_create(
495 			       "session_mp",
496 			       NUM_SESSIONS, 0, SESS_MEMPOOL_CACHE_SIZE, 0,
497 			       SOCKET_ID_ANY);
498 	if (g_session_mp == NULL) {
499 		SPDK_ERRLOG("Cannot create session pool max size 0x%x\n", max_sess_size);
500 		rc = -ENOMEM;
501 		goto error_create_session_mp;
502 	}
503 
504 	g_mbuf_mp = rte_pktmbuf_pool_create("mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
505 					    0, 0, SPDK_ENV_SOCKET_ID_ANY);
506 	if (g_mbuf_mp == NULL) {
507 		SPDK_ERRLOG("Cannot create mbuf pool\n");
508 		rc = -ENOMEM;
509 		goto error_create_mbuf;
510 	}
511 
512 	/* We use per op private data as suggested by DPDK and to store the IV and
513 	 * our own struct for queueing ops.
514 	 */
515 	g_crypto_op_mp = rte_crypto_op_pool_create("op_mp",
516 			 RTE_CRYPTO_OP_TYPE_SYMMETRIC,
517 			 NUM_MBUFS,
518 			 POOL_CACHE_SIZE,
519 			 (DEFAULT_NUM_XFORMS *
520 			  sizeof(struct rte_crypto_sym_xform)) +
521 			 IV_LENGTH + QUEUED_OP_LENGTH,
522 			 rte_socket_id());
523 
524 	if (g_crypto_op_mp == NULL) {
525 		SPDK_ERRLOG("Cannot create op pool\n");
526 		rc = -ENOMEM;
527 		goto error_create_op;
528 	}
529 
530 	/* Init all devices */
531 	for (i = 0; i < cdev_count; i++) {
532 		rc = create_vbdev_dev(i, num_lcores);
533 		if (rc) {
534 			goto err;
535 		}
536 	}
537 
538 	/* Assign index values to the QAT device qp nodes so that we can
539 	 * assign them for optimal performance.
540 	 */
541 	i = 0;
542 	TAILQ_FOREACH(dev_qp, &g_device_qp_qat, link) {
543 		dev_qp->index = i++;
544 	}
545 
546 	g_shinfo.free_cb = shinfo_free_cb;
547 	return 0;
548 
549 	/* Error cleanup paths. */
550 err:
551 	TAILQ_FOREACH_SAFE(device, &g_vbdev_devs, link, tmp_dev) {
552 		TAILQ_REMOVE(&g_vbdev_devs, device, link);
553 		release_vbdev_dev(device);
554 	}
555 	rte_mempool_free(g_crypto_op_mp);
556 	g_crypto_op_mp = NULL;
557 error_create_op:
558 	rte_mempool_free(g_mbuf_mp);
559 	g_mbuf_mp = NULL;
560 error_create_mbuf:
561 	rte_mempool_free(g_session_mp);
562 	g_session_mp = NULL;
563 error_create_session_mp:
564 	if (g_session_mp_priv != NULL) {
565 		rte_mempool_free(g_session_mp_priv);
566 		g_session_mp_priv = NULL;
567 	}
568 	return rc;
569 }
570 
571 /* Following an encrypt or decrypt we need to then either write the encrypted data or finish
572  * the read on decrypted data. Do that here.
573  */
574 static void
575 _crypto_operation_complete(struct spdk_bdev_io *bdev_io)
576 {
577 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
578 					   crypto_bdev);
579 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
580 	struct crypto_io_channel *crypto_ch = io_ctx->crypto_ch;
581 	struct spdk_bdev_io *free_me = io_ctx->read_io;
582 	int rc = 0;
583 
584 	/* Can also be called from the crypto_dev_poller() to fail the stuck re-enqueue ops IO. */
585 	if (io_ctx->on_pending_list) {
586 		TAILQ_REMOVE(&crypto_ch->pending_cry_ios, bdev_io, module_link);
587 		io_ctx->on_pending_list = false;
588 	}
589 
590 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
591 
592 		/* Complete the original IO and then free the one that we created
593 		 * as a result of issuing an IO via submit_request.
594 		 */
595 		if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
596 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
597 		} else {
598 			SPDK_ERRLOG("Issue with decryption on bdev_io %p\n", bdev_io);
599 			rc = -EINVAL;
600 		}
601 		spdk_bdev_free_io(free_me);
602 
603 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
604 
605 		if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
606 			/* Write the encrypted data. */
607 			rc = spdk_bdev_writev_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
608 						     &io_ctx->aux_buf_iov, 1, io_ctx->aux_offset_blocks,
609 						     io_ctx->aux_num_blocks, _complete_internal_write,
610 						     bdev_io);
611 		} else {
612 			SPDK_ERRLOG("Issue with encryption on bdev_io %p\n", bdev_io);
613 			rc = -EINVAL;
614 		}
615 
616 	} else {
617 		SPDK_ERRLOG("Unknown bdev type %u on crypto operation completion\n",
618 			    bdev_io->type);
619 		rc = -EINVAL;
620 	}
621 
622 	if (rc) {
623 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
624 	}
625 }
626 
627 static void
628 cancel_queued_crypto_ops(struct crypto_io_channel *crypto_ch, struct spdk_bdev_io *bdev_io)
629 {
630 	struct rte_mbuf *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE];
631 	struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE];
632 	struct vbdev_crypto_op *op_to_cancel, *tmp_op;
633 	struct rte_crypto_op *crypto_op;
634 	int num_mbufs, num_dequeued_ops;
635 
636 	/* Remove all ops from the failed IO. Since we don't know the
637 	 * order we have to check them all. */
638 	num_mbufs = 0;
639 	num_dequeued_ops = 0;
640 	TAILQ_FOREACH_SAFE(op_to_cancel, &crypto_ch->queued_cry_ops, link, tmp_op) {
641 		/* Checking if this is our op. One IO contains multiple ops. */
642 		if (bdev_io == op_to_cancel->bdev_io) {
643 			crypto_op = op_to_cancel->crypto_op;
644 			TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_cancel, link);
645 
646 			/* Populating lists for freeing mbufs and ops. */
647 			mbufs_to_free[num_mbufs++] = (void *)crypto_op->sym->m_src;
648 			if (crypto_op->sym->m_dst) {
649 				mbufs_to_free[num_mbufs++] = (void *)crypto_op->sym->m_dst;
650 			}
651 			dequeued_ops[num_dequeued_ops++] = crypto_op;
652 		}
653 	}
654 
655 	/* Now bulk free both mbufs and crypto operations. */
656 	if (num_dequeued_ops > 0) {
657 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)dequeued_ops,
658 				     num_dequeued_ops);
659 		assert(num_mbufs > 0);
660 		/* This also releases chained mbufs if any. */
661 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
662 	}
663 }
664 
665 static int _crypto_operation(struct spdk_bdev_io *bdev_io,
666 			     enum rte_crypto_cipher_operation crypto_op,
667 			     void *aux_buf);
668 
669 /* This is the poller for the crypto device. It uses a single API to dequeue whatever is ready at
670  * the device. Then we need to decide if what we've got so far (including previous poller
671  * runs) totals up to one or more complete bdev_ios and if so continue with the bdev_io
672  * accordingly. This means either completing a read or issuing a new write.
673  */
674 static int
675 crypto_dev_poller(void *args)
676 {
677 	struct crypto_io_channel *crypto_ch = args;
678 	uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id;
679 	int i, num_dequeued_ops, num_enqueued_ops;
680 	struct spdk_bdev_io *bdev_io = NULL;
681 	struct crypto_bdev_io *io_ctx = NULL;
682 	struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE];
683 	struct rte_mbuf *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE];
684 	int num_mbufs = 0;
685 	struct vbdev_crypto_op *op_to_resubmit;
686 
687 	/* Each run of the poller will get just what the device has available
688 	 * at the moment we call it, we don't check again after draining the
689 	 * first batch.
690 	 */
691 	num_dequeued_ops = rte_cryptodev_dequeue_burst(cdev_id, crypto_ch->device_qp->qp,
692 			   dequeued_ops, MAX_DEQUEUE_BURST_SIZE);
693 
694 	/* Check if operation was processed successfully */
695 	for (i = 0; i < num_dequeued_ops; i++) {
696 
697 		/* We don't know the order or association of the crypto ops wrt any
698 		 * particular bdev_io so need to look at each and determine if it's
699 		 * the last one for it's bdev_io or not.
700 		 */
701 		bdev_io = (struct spdk_bdev_io *)*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset,
702 				uint64_t *);
703 		assert(bdev_io != NULL);
704 		io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
705 
706 		if (dequeued_ops[i]->status != RTE_CRYPTO_OP_STATUS_SUCCESS) {
707 			SPDK_ERRLOG("error with op %d status %u\n", i,
708 				    dequeued_ops[i]->status);
709 			/* Update the bdev status to error, we'll still process the
710 			 * rest of the crypto ops for this bdev_io though so they
711 			 * aren't left hanging.
712 			 */
713 			io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
714 		}
715 
716 		assert(io_ctx->cryop_cnt_remaining > 0);
717 
718 		/* Return the associated src and dst mbufs by collecting them into
719 		 * an array that we can use the bulk API to free after the loop.
720 		 */
721 		*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset, uint64_t *) = 0;
722 		mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_src;
723 		if (dequeued_ops[i]->sym->m_dst) {
724 			mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_dst;
725 		}
726 
727 		/* done encrypting, complete the bdev_io */
728 		if (--io_ctx->cryop_cnt_remaining == 0) {
729 
730 			/* If we're completing this with an outstanding reset we need
731 			 * to fail it.
732 			 */
733 			if (crypto_ch->iter) {
734 				io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
735 			}
736 
737 			/* Complete the IO */
738 			_crypto_operation_complete(bdev_io);
739 		}
740 	}
741 
742 	/* Now bulk free both mbufs and crypto operations. */
743 	if (num_dequeued_ops > 0) {
744 		rte_mempool_put_bulk(g_crypto_op_mp,
745 				     (void **)dequeued_ops,
746 				     num_dequeued_ops);
747 		assert(num_mbufs > 0);
748 		/* This also releases chained mbufs if any. */
749 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
750 	}
751 
752 	/* Check if there are any pending crypto ops to process */
753 	while (!TAILQ_EMPTY(&crypto_ch->queued_cry_ops)) {
754 		op_to_resubmit = TAILQ_FIRST(&crypto_ch->queued_cry_ops);
755 		bdev_io = op_to_resubmit->bdev_io;
756 		io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
757 		num_enqueued_ops = rte_cryptodev_enqueue_burst(op_to_resubmit->cdev_id,
758 				   op_to_resubmit->qp,
759 				   &op_to_resubmit->crypto_op,
760 				   1);
761 		if (num_enqueued_ops == 1) {
762 			/* Make sure we don't put this on twice as one bdev_io is made up
763 			 * of many crypto ops.
764 			 */
765 			if (io_ctx->on_pending_list == false) {
766 				TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link);
767 				io_ctx->on_pending_list = true;
768 			}
769 			TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_resubmit, link);
770 		} else {
771 			if (op_to_resubmit->crypto_op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED) {
772 				/* If we couldn't get one, just break and try again later. */
773 				break;
774 			} else {
775 				/* Something is really wrong with the op. Most probably the
776 				 * mbuf is broken or the HW is not able to process the request.
777 				 * Fail the IO and remove its ops from the queued ops list. */
778 				io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
779 
780 				cancel_queued_crypto_ops(crypto_ch, bdev_io);
781 
782 				/* Fail the IO if there is nothing left on device. */
783 				if (--io_ctx->cryop_cnt_remaining == 0) {
784 					_crypto_operation_complete(bdev_io);
785 				}
786 			}
787 
788 		}
789 	}
790 
791 	/* If the channel iter is not NULL, we need to continue to poll
792 	 * until the pending list is empty, then we can move on to the
793 	 * next channel.
794 	 */
795 	if (crypto_ch->iter && TAILQ_EMPTY(&crypto_ch->pending_cry_ios)) {
796 		SPDK_NOTICELOG("Channel %p has been quiesced.\n", crypto_ch);
797 		spdk_for_each_channel_continue(crypto_ch->iter, 0);
798 		crypto_ch->iter = NULL;
799 	}
800 
801 	return num_dequeued_ops;
802 }
803 
804 /* Allocate the new mbuf of @remainder size with data pointed by @addr and attach
805  * it to the @orig_mbuf. */
806 static int
807 mbuf_chain_remainder(struct spdk_bdev_io *bdev_io, struct rte_mbuf *orig_mbuf,
808 		     uint8_t *addr, uint32_t remainder)
809 {
810 	uint64_t phys_addr, phys_len;
811 	struct rte_mbuf *chain_mbuf;
812 	int rc;
813 
814 	phys_len = remainder;
815 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
816 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len != remainder)) {
817 		return -EFAULT;
818 	}
819 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&chain_mbuf, 1);
820 	if (spdk_unlikely(rc)) {
821 		return -ENOMEM;
822 	}
823 	/* Store context in every mbuf as we don't know anything about completion order */
824 	*RTE_MBUF_DYNFIELD(chain_mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)bdev_io;
825 	rte_pktmbuf_attach_extbuf(chain_mbuf, addr, phys_addr, phys_len, &g_shinfo);
826 	rte_pktmbuf_append(chain_mbuf, phys_len);
827 
828 	/* Chained buffer is released by rte_pktbuf_free_bulk() automagicaly. */
829 	rte_pktmbuf_chain(orig_mbuf, chain_mbuf);
830 	return 0;
831 }
832 
833 /* Attach data buffer pointed by @addr to @mbuf. Return utilized len of the
834  * contiguous space that was physically available. */
835 static uint64_t
836 mbuf_attach_buf(struct spdk_bdev_io *bdev_io, struct rte_mbuf *mbuf,
837 		uint8_t *addr, uint32_t len)
838 {
839 	uint64_t phys_addr, phys_len;
840 
841 	/* Store context in every mbuf as we don't know anything about completion order */
842 	*RTE_MBUF_DYNFIELD(mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)bdev_io;
843 
844 	phys_len = len;
845 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
846 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len == 0)) {
847 		return 0;
848 	}
849 	assert(phys_len <= len);
850 
851 	/* Set the mbuf elements address and length. */
852 	rte_pktmbuf_attach_extbuf(mbuf, addr, phys_addr, phys_len, &g_shinfo);
853 	rte_pktmbuf_append(mbuf, phys_len);
854 
855 	return phys_len;
856 }
857 
858 /* We're either encrypting on the way down or decrypting on the way back. */
859 static int
860 _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation crypto_op,
861 		  void *aux_buf)
862 {
863 	uint16_t num_enqueued_ops = 0;
864 	uint32_t cryop_cnt = bdev_io->u.bdev.num_blocks;
865 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
866 	struct crypto_io_channel *crypto_ch = io_ctx->crypto_ch;
867 	uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id;
868 	uint32_t crypto_len = io_ctx->crypto_bdev->crypto_bdev.blocklen;
869 	uint64_t total_length = bdev_io->u.bdev.num_blocks * crypto_len;
870 	int rc;
871 	uint32_t iov_index = 0;
872 	uint32_t allocated = 0;
873 	uint8_t *current_iov = NULL;
874 	uint64_t total_remaining = 0;
875 	uint64_t current_iov_remaining = 0;
876 	uint32_t crypto_index = 0;
877 	uint32_t en_offset = 0;
878 	struct rte_crypto_op *crypto_ops[MAX_ENQUEUE_ARRAY_SIZE];
879 	struct rte_mbuf *src_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
880 	struct rte_mbuf *dst_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
881 	int burst;
882 	struct vbdev_crypto_op *op_to_queue;
883 	uint64_t alignment = spdk_bdev_get_buf_align(&io_ctx->crypto_bdev->crypto_bdev);
884 
885 	assert((bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) <= CRYPTO_MAX_IO);
886 
887 	/* Get the number of source mbufs that we need. These will always be 1:1 because we
888 	 * don't support chaining. The reason we don't is because of our decision to use
889 	 * LBA as IV, there can be no case where we'd need >1 mbuf per crypto op or the
890 	 * op would be > 1 LBA.
891 	 */
892 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, cryop_cnt);
893 	if (rc) {
894 		SPDK_ERRLOG("Failed to get src_mbufs!\n");
895 		return -ENOMEM;
896 	}
897 
898 	/* Get the same amount but these buffers to describe the encrypted data location (dst). */
899 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
900 		rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, dst_mbufs, cryop_cnt);
901 		if (rc) {
902 			SPDK_ERRLOG("Failed to get dst_mbufs!\n");
903 			rc = -ENOMEM;
904 			goto error_get_dst;
905 		}
906 	}
907 
908 #ifdef __clang_analyzer__
909 	/* silence scan-build false positive */
910 	SPDK_CLANG_ANALYZER_PREINIT_PTR_ARRAY(crypto_ops, MAX_ENQUEUE_ARRAY_SIZE, 0x1000);
911 #endif
912 	/* Allocate crypto operations. */
913 	allocated = rte_crypto_op_bulk_alloc(g_crypto_op_mp,
914 					     RTE_CRYPTO_OP_TYPE_SYMMETRIC,
915 					     crypto_ops, cryop_cnt);
916 	if (allocated < cryop_cnt) {
917 		SPDK_ERRLOG("Failed to allocate crypto ops!\n");
918 		rc = -ENOMEM;
919 		goto error_get_ops;
920 	}
921 
922 	/* For encryption, we need to prepare a single contiguous buffer as the encryption
923 	 * destination, we'll then pass that along for the write after encryption is done.
924 	 * This is done to avoiding encrypting the provided write buffer which may be
925 	 * undesirable in some use cases.
926 	 */
927 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
928 		io_ctx->aux_buf_iov.iov_len = total_length;
929 		io_ctx->aux_buf_raw = aux_buf;
930 		io_ctx->aux_buf_iov.iov_base  = (void *)(((uintptr_t)aux_buf + (alignment - 1)) & ~(alignment - 1));
931 		io_ctx->aux_offset_blocks = bdev_io->u.bdev.offset_blocks;
932 		io_ctx->aux_num_blocks = bdev_io->u.bdev.num_blocks;
933 	}
934 
935 	/* This value is used in the completion callback to determine when the bdev_io is
936 	 * complete.
937 	 */
938 	io_ctx->cryop_cnt_remaining = cryop_cnt;
939 
940 	/* As we don't support chaining because of a decision to use LBA as IV, construction
941 	 * of crypto operations is straightforward. We build both the op, the mbuf and the
942 	 * dst_mbuf in our local arrays by looping through the length of the bdev IO and
943 	 * picking off LBA sized blocks of memory from the IOVs as we walk through them. Each
944 	 * LBA sized chunk of memory will correspond 1:1 to a crypto operation and a single
945 	 * mbuf per crypto operation.
946 	 */
947 	total_remaining = total_length;
948 	current_iov = bdev_io->u.bdev.iovs[iov_index].iov_base;
949 	current_iov_remaining = bdev_io->u.bdev.iovs[iov_index].iov_len;
950 	do {
951 		uint8_t *iv_ptr;
952 		uint8_t *buf_addr;
953 		uint64_t phys_len;
954 		uint32_t remainder;
955 		uint64_t op_block_offset;
956 
957 		phys_len = mbuf_attach_buf(bdev_io, src_mbufs[crypto_index],
958 					   current_iov, crypto_len);
959 		if (spdk_unlikely(phys_len == 0)) {
960 			goto error_attach_session;
961 			rc = -EFAULT;
962 		}
963 
964 		/* Handle the case of page boundary. */
965 		remainder = crypto_len - phys_len;
966 		if (spdk_unlikely(remainder > 0)) {
967 			rc = mbuf_chain_remainder(bdev_io, src_mbufs[crypto_index],
968 						  current_iov + phys_len, remainder);
969 			if (spdk_unlikely(rc)) {
970 				goto error_attach_session;
971 			}
972 		}
973 
974 		/* Set the IV - we use the LBA of the crypto_op */
975 		iv_ptr = rte_crypto_op_ctod_offset(crypto_ops[crypto_index], uint8_t *,
976 						   IV_OFFSET);
977 		memset(iv_ptr, 0, IV_LENGTH);
978 		op_block_offset = bdev_io->u.bdev.offset_blocks + crypto_index;
979 		rte_memcpy(iv_ptr, &op_block_offset, sizeof(uint64_t));
980 
981 		/* Set the data to encrypt/decrypt length */
982 		crypto_ops[crypto_index]->sym->cipher.data.length = crypto_len;
983 		crypto_ops[crypto_index]->sym->cipher.data.offset = 0;
984 
985 		/* link the mbuf to the crypto op. */
986 		crypto_ops[crypto_index]->sym->m_src = src_mbufs[crypto_index];
987 
988 		/* For encrypt, point the destination to a buffer we allocate and redirect the bdev_io
989 		 * that will be used to process the write on completion to the same buffer. Setting
990 		 * up the en_buffer is a little simpler as we know the destination buffer is single IOV.
991 		 */
992 		if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
993 			buf_addr = io_ctx->aux_buf_iov.iov_base + en_offset;
994 			phys_len = mbuf_attach_buf(bdev_io, dst_mbufs[crypto_index],
995 						   buf_addr, crypto_len);
996 			if (spdk_unlikely(phys_len == 0)) {
997 				rc = -EFAULT;
998 				goto error_attach_session;
999 			}
1000 
1001 			crypto_ops[crypto_index]->sym->m_dst = dst_mbufs[crypto_index];
1002 			en_offset += phys_len;
1003 
1004 			/* Handle the case of page boundary. */
1005 			remainder = crypto_len - phys_len;
1006 			if (spdk_unlikely(remainder > 0)) {
1007 				rc = mbuf_chain_remainder(bdev_io, dst_mbufs[crypto_index],
1008 							  buf_addr + phys_len, remainder);
1009 				if (spdk_unlikely(rc)) {
1010 					goto error_attach_session;
1011 				}
1012 				en_offset += remainder;
1013 			}
1014 
1015 			/* Attach the crypto session to the operation */
1016 			rc = rte_crypto_op_attach_sym_session(crypto_ops[crypto_index],
1017 							      io_ctx->crypto_bdev->session_encrypt);
1018 			if (rc) {
1019 				rc = -EINVAL;
1020 				goto error_attach_session;
1021 			}
1022 		} else {
1023 			crypto_ops[crypto_index]->sym->m_dst = NULL;
1024 
1025 			/* Attach the crypto session to the operation */
1026 			rc = rte_crypto_op_attach_sym_session(crypto_ops[crypto_index],
1027 							      io_ctx->crypto_bdev->session_decrypt);
1028 			if (rc) {
1029 				rc = -EINVAL;
1030 				goto error_attach_session;
1031 			}
1032 		}
1033 
1034 		/* Subtract our running totals for the op in progress and the overall bdev io */
1035 		total_remaining -= crypto_len;
1036 		current_iov_remaining -= crypto_len;
1037 
1038 		/* move our current IOV pointer accordingly. */
1039 		current_iov += crypto_len;
1040 
1041 		/* move on to the next crypto operation */
1042 		crypto_index++;
1043 
1044 		/* If we're done with this IOV, move to the next one. */
1045 		if (current_iov_remaining == 0 && total_remaining > 0) {
1046 			iov_index++;
1047 			current_iov = bdev_io->u.bdev.iovs[iov_index].iov_base;
1048 			current_iov_remaining = bdev_io->u.bdev.iovs[iov_index].iov_len;
1049 		}
1050 	} while (total_remaining > 0);
1051 
1052 	/* Enqueue everything we've got but limit by the max number of descriptors we
1053 	 * configured the crypto device for.
1054 	 */
1055 	burst = spdk_min(cryop_cnt, io_ctx->crypto_bdev->qp_desc_nr);
1056 	num_enqueued_ops = rte_cryptodev_enqueue_burst(cdev_id, crypto_ch->device_qp->qp,
1057 			   &crypto_ops[0],
1058 			   burst);
1059 
1060 	/* Add this bdev_io to our outstanding list if any of its crypto ops made it. */
1061 	if (num_enqueued_ops > 0) {
1062 		TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link);
1063 		io_ctx->on_pending_list = true;
1064 	}
1065 	/* We were unable to enqueue everything but did get some, so need to decide what
1066 	 * to do based on the status of the last op.
1067 	 */
1068 	if (num_enqueued_ops < cryop_cnt) {
1069 		switch (crypto_ops[num_enqueued_ops]->status) {
1070 		case RTE_CRYPTO_OP_STATUS_NOT_PROCESSED:
1071 			/* Queue them up on a linked list to be resubmitted via the poller. */
1072 			for (crypto_index = num_enqueued_ops; crypto_index < cryop_cnt; crypto_index++) {
1073 				op_to_queue = (struct vbdev_crypto_op *)rte_crypto_op_ctod_offset(crypto_ops[crypto_index],
1074 						uint8_t *, QUEUED_OP_OFFSET);
1075 				op_to_queue->cdev_id = cdev_id;
1076 				op_to_queue->qp = crypto_ch->device_qp->qp;
1077 				op_to_queue->crypto_op = crypto_ops[crypto_index];
1078 				op_to_queue->bdev_io = bdev_io;
1079 				TAILQ_INSERT_TAIL(&crypto_ch->queued_cry_ops,
1080 						  op_to_queue,
1081 						  link);
1082 			}
1083 			break;
1084 		default:
1085 			/* For all other statuses, set the io_ctx bdev_io status so that
1086 			 * the poller will pick the failure up for the overall bdev status.
1087 			 */
1088 			io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
1089 			if (num_enqueued_ops == 0) {
1090 				/* If nothing was enqueued, but the last one wasn't because of
1091 				 * busy, fail it now as the poller won't know anything about it.
1092 				 */
1093 				rc = -EINVAL;
1094 				goto error_attach_session;
1095 			}
1096 			break;
1097 		}
1098 	}
1099 
1100 	return rc;
1101 
1102 	/* Error cleanup paths. */
1103 error_attach_session:
1104 error_get_ops:
1105 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
1106 		/* This also releases chained mbufs if any. */
1107 		rte_pktmbuf_free_bulk(dst_mbufs, cryop_cnt);
1108 	}
1109 	if (allocated > 0) {
1110 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops,
1111 				     allocated);
1112 	}
1113 error_get_dst:
1114 	/* This also releases chained mbufs if any. */
1115 	rte_pktmbuf_free_bulk(src_mbufs, cryop_cnt);
1116 	return rc;
1117 }
1118 
1119 /* This function is called after all channels have been quiesced following
1120  * a bdev reset.
1121  */
1122 static void
1123 _ch_quiesce_done(struct spdk_io_channel_iter *i, int status)
1124 {
1125 	struct crypto_bdev_io *io_ctx = spdk_io_channel_iter_get_ctx(i);
1126 
1127 	assert(TAILQ_EMPTY(&io_ctx->crypto_ch->pending_cry_ios));
1128 	assert(io_ctx->orig_io != NULL);
1129 
1130 	spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
1131 }
1132 
1133 /* This function is called per channel to quiesce IOs before completing a
1134  * bdev reset that we received.
1135  */
1136 static void
1137 _ch_quiesce(struct spdk_io_channel_iter *i)
1138 {
1139 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1140 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1141 
1142 	crypto_ch->iter = i;
1143 	/* When the poller runs, it will see the non-NULL iter and handle
1144 	 * the quiesce.
1145 	 */
1146 }
1147 
1148 /* Completion callback for IO that were issued from this bdev other than read/write.
1149  * They have their own for readability.
1150  */
1151 static void
1152 _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1153 {
1154 	struct spdk_bdev_io *orig_io = cb_arg;
1155 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1156 
1157 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1158 		struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1159 
1160 		assert(orig_io == orig_ctx->orig_io);
1161 
1162 		spdk_bdev_free_io(bdev_io);
1163 
1164 		spdk_for_each_channel(orig_ctx->crypto_bdev,
1165 				      _ch_quiesce,
1166 				      orig_ctx,
1167 				      _ch_quiesce_done);
1168 		return;
1169 	}
1170 
1171 	spdk_bdev_io_complete(orig_io, status);
1172 	spdk_bdev_free_io(bdev_io);
1173 }
1174 
1175 /* Completion callback for writes that were issued from this bdev. */
1176 static void
1177 _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1178 {
1179 	struct spdk_bdev_io *orig_io = cb_arg;
1180 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1181 	struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1182 
1183 	spdk_bdev_io_put_aux_buf(orig_io, orig_ctx->aux_buf_raw);
1184 
1185 	spdk_bdev_io_complete(orig_io, status);
1186 	spdk_bdev_free_io(bdev_io);
1187 }
1188 
1189 /* Completion callback for reads that were issued from this bdev. */
1190 static void
1191 _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1192 {
1193 	struct spdk_bdev_io *orig_io = cb_arg;
1194 	struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1195 
1196 	if (success) {
1197 
1198 		/* Save off this bdev_io so it can be freed after decryption. */
1199 		orig_ctx->read_io = bdev_io;
1200 
1201 		if (!_crypto_operation(orig_io, RTE_CRYPTO_CIPHER_OP_DECRYPT, NULL)) {
1202 			return;
1203 		} else {
1204 			SPDK_ERRLOG("Failed to decrypt!\n");
1205 		}
1206 	} else {
1207 		SPDK_ERRLOG("Failed to read prior to decrypting!\n");
1208 	}
1209 
1210 	spdk_bdev_io_complete(orig_io, SPDK_BDEV_IO_STATUS_FAILED);
1211 	spdk_bdev_free_io(bdev_io);
1212 }
1213 
1214 static void
1215 vbdev_crypto_resubmit_io(void *arg)
1216 {
1217 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
1218 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1219 
1220 	vbdev_crypto_submit_request(io_ctx->ch, bdev_io);
1221 }
1222 
1223 static void
1224 vbdev_crypto_queue_io(struct spdk_bdev_io *bdev_io)
1225 {
1226 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1227 	int rc;
1228 
1229 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
1230 	io_ctx->bdev_io_wait.cb_fn = vbdev_crypto_resubmit_io;
1231 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
1232 
1233 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->crypto_ch->base_ch, &io_ctx->bdev_io_wait);
1234 	if (rc != 0) {
1235 		SPDK_ERRLOG("Queue io failed in vbdev_crypto_queue_io, rc=%d.\n", rc);
1236 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1237 	}
1238 }
1239 
1240 /* Callback for getting a buf from the bdev pool in the event that the caller passed
1241  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
1242  * beneath us before we're done with it.
1243  */
1244 static void
1245 crypto_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1246 		       bool success)
1247 {
1248 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
1249 					   crypto_bdev);
1250 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1251 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1252 	int rc;
1253 
1254 	if (!success) {
1255 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1256 		return;
1257 	}
1258 
1259 	rc = spdk_bdev_readv_blocks(crypto_bdev->base_desc, crypto_ch->base_ch, bdev_io->u.bdev.iovs,
1260 				    bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
1261 				    bdev_io->u.bdev.num_blocks, _complete_internal_read,
1262 				    bdev_io);
1263 	if (rc != 0) {
1264 		if (rc == -ENOMEM) {
1265 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1266 			io_ctx->ch = ch;
1267 			vbdev_crypto_queue_io(bdev_io);
1268 		} else {
1269 			SPDK_ERRLOG("Failed to submit bdev_io!\n");
1270 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1271 		}
1272 	}
1273 }
1274 
1275 /* For encryption we don't want to encrypt the data in place as the host isn't
1276  * expecting us to mangle its data buffers so we need to encrypt into the bdev
1277  * aux buffer, then we can use that as the source for the disk data transfer.
1278  */
1279 static void
1280 crypto_write_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1281 			void *aux_buf)
1282 {
1283 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1284 	int rc = 0;
1285 
1286 	rc = _crypto_operation(bdev_io, RTE_CRYPTO_CIPHER_OP_ENCRYPT, aux_buf);
1287 	if (rc != 0) {
1288 		spdk_bdev_io_put_aux_buf(bdev_io, aux_buf);
1289 		if (rc == -ENOMEM) {
1290 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1291 			io_ctx->ch = ch;
1292 			vbdev_crypto_queue_io(bdev_io);
1293 		} else {
1294 			SPDK_ERRLOG("Failed to submit bdev_io!\n");
1295 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1296 		}
1297 	}
1298 }
1299 
1300 /* Called when someone submits IO to this crypto vbdev. For IO's not relevant to crypto,
1301  * we're simply passing it on here via SPDK IO calls which in turn allocate another bdev IO
1302  * and call our cpl callback provided below along with the original bdev_io so that we can
1303  * complete it once this IO completes. For crypto operations, we'll either encrypt it first
1304  * (writes) then call back into bdev to submit it or we'll submit a read and then catch it
1305  * on the way back for decryption.
1306  */
1307 static void
1308 vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1309 {
1310 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
1311 					   crypto_bdev);
1312 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1313 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1314 	int rc = 0;
1315 
1316 	memset(io_ctx, 0, sizeof(struct crypto_bdev_io));
1317 	io_ctx->crypto_bdev = crypto_bdev;
1318 	io_ctx->crypto_ch = crypto_ch;
1319 	io_ctx->orig_io = bdev_io;
1320 	io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
1321 
1322 	switch (bdev_io->type) {
1323 	case SPDK_BDEV_IO_TYPE_READ:
1324 		spdk_bdev_io_get_buf(bdev_io, crypto_read_get_buf_cb,
1325 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1326 		break;
1327 	case SPDK_BDEV_IO_TYPE_WRITE:
1328 		/* Tell the bdev layer that we need an aux buf in addition to the data
1329 		 * buf already associated with the bdev.
1330 		 */
1331 		spdk_bdev_io_get_aux_buf(bdev_io, crypto_write_get_buf_cb);
1332 		break;
1333 	case SPDK_BDEV_IO_TYPE_UNMAP:
1334 		rc = spdk_bdev_unmap_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
1335 					    bdev_io->u.bdev.offset_blocks,
1336 					    bdev_io->u.bdev.num_blocks,
1337 					    _complete_internal_io, bdev_io);
1338 		break;
1339 	case SPDK_BDEV_IO_TYPE_FLUSH:
1340 		rc = spdk_bdev_flush_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
1341 					    bdev_io->u.bdev.offset_blocks,
1342 					    bdev_io->u.bdev.num_blocks,
1343 					    _complete_internal_io, bdev_io);
1344 		break;
1345 	case SPDK_BDEV_IO_TYPE_RESET:
1346 		rc = spdk_bdev_reset(crypto_bdev->base_desc, crypto_ch->base_ch,
1347 				     _complete_internal_io, bdev_io);
1348 		break;
1349 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1350 	default:
1351 		SPDK_ERRLOG("crypto: unknown I/O type %d\n", bdev_io->type);
1352 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1353 		return;
1354 	}
1355 
1356 	if (rc != 0) {
1357 		if (rc == -ENOMEM) {
1358 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1359 			io_ctx->ch = ch;
1360 			vbdev_crypto_queue_io(bdev_io);
1361 		} else {
1362 			SPDK_ERRLOG("Failed to submit bdev_io!\n");
1363 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1364 		}
1365 	}
1366 }
1367 
1368 /* We'll just call the base bdev and let it answer except for WZ command which
1369  * we always say we don't support so that the bdev layer will actually send us
1370  * real writes that we can encrypt.
1371  */
1372 static bool
1373 vbdev_crypto_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1374 {
1375 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1376 
1377 	switch (io_type) {
1378 	case SPDK_BDEV_IO_TYPE_WRITE:
1379 	case SPDK_BDEV_IO_TYPE_UNMAP:
1380 	case SPDK_BDEV_IO_TYPE_RESET:
1381 	case SPDK_BDEV_IO_TYPE_READ:
1382 	case SPDK_BDEV_IO_TYPE_FLUSH:
1383 		return spdk_bdev_io_type_supported(crypto_bdev->base_bdev, io_type);
1384 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1385 	/* Force the bdev layer to issue actual writes of zeroes so we can
1386 	 * encrypt them as regular writes.
1387 	 */
1388 	default:
1389 		return false;
1390 	}
1391 }
1392 
1393 /* Callback for unregistering the IO device. */
1394 static void
1395 _device_unregister_cb(void *io_device)
1396 {
1397 	struct vbdev_crypto *crypto_bdev = io_device;
1398 
1399 	/* Done with this crypto_bdev. */
1400 	rte_cryptodev_sym_session_free(crypto_bdev->session_decrypt);
1401 	rte_cryptodev_sym_session_free(crypto_bdev->session_encrypt);
1402 	crypto_bdev->opts = NULL;
1403 	free(crypto_bdev->crypto_bdev.name);
1404 	free(crypto_bdev);
1405 }
1406 
1407 /* Wrapper for the bdev close operation. */
1408 static void
1409 _vbdev_crypto_destruct(void *ctx)
1410 {
1411 	struct spdk_bdev_desc *desc = ctx;
1412 
1413 	spdk_bdev_close(desc);
1414 }
1415 
1416 /* Called after we've unregistered following a hot remove callback.
1417  * Our finish entry point will be called next.
1418  */
1419 static int
1420 vbdev_crypto_destruct(void *ctx)
1421 {
1422 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1423 
1424 	/* Remove this device from the internal list */
1425 	TAILQ_REMOVE(&g_vbdev_crypto, crypto_bdev, link);
1426 
1427 	/* Unclaim the underlying bdev. */
1428 	spdk_bdev_module_release_bdev(crypto_bdev->base_bdev);
1429 
1430 	/* Close the underlying bdev on its same opened thread. */
1431 	if (crypto_bdev->thread && crypto_bdev->thread != spdk_get_thread()) {
1432 		spdk_thread_send_msg(crypto_bdev->thread, _vbdev_crypto_destruct, crypto_bdev->base_desc);
1433 	} else {
1434 		spdk_bdev_close(crypto_bdev->base_desc);
1435 	}
1436 
1437 	/* Unregister the io_device. */
1438 	spdk_io_device_unregister(crypto_bdev, _device_unregister_cb);
1439 
1440 	g_number_of_claimed_volumes--;
1441 
1442 	return 0;
1443 }
1444 
1445 /* We supplied this as an entry point for upper layers who want to communicate to this
1446  * bdev.  This is how they get a channel. We are passed the same context we provided when
1447  * we created our crypto vbdev in examine() which, for this bdev, is the address of one of
1448  * our context nodes. From here we'll ask the SPDK channel code to fill out our channel
1449  * struct and we'll keep it in our crypto node.
1450  */
1451 static struct spdk_io_channel *
1452 vbdev_crypto_get_io_channel(void *ctx)
1453 {
1454 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1455 
1456 	/* The IO channel code will allocate a channel for us which consists of
1457 	 * the SPDK channel structure plus the size of our crypto_io_channel struct
1458 	 * that we passed in when we registered our IO device. It will then call
1459 	 * our channel create callback to populate any elements that we need to
1460 	 * update.
1461 	 */
1462 	return spdk_get_io_channel(crypto_bdev);
1463 }
1464 
1465 /* This is the output for bdev_get_bdevs() for this vbdev */
1466 static int
1467 vbdev_crypto_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1468 {
1469 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1470 	char *hexkey = NULL, *hexkey2 = NULL;
1471 	int rc = 0;
1472 
1473 	hexkey = hexlify(crypto_bdev->opts->key,
1474 			 crypto_bdev->opts->key_size);
1475 	if (!hexkey) {
1476 		return -ENOMEM;
1477 	}
1478 
1479 	if (crypto_bdev->opts->key2) {
1480 		hexkey2 = hexlify(crypto_bdev->opts->key2,
1481 				  crypto_bdev->opts->key2_size);
1482 		if (!hexkey2) {
1483 			rc = -ENOMEM;
1484 			goto out_err;
1485 		}
1486 	}
1487 
1488 	spdk_json_write_name(w, "crypto");
1489 	spdk_json_write_object_begin(w);
1490 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(crypto_bdev->base_bdev));
1491 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&crypto_bdev->crypto_bdev));
1492 	spdk_json_write_named_string(w, "crypto_pmd", crypto_bdev->opts->drv_name);
1493 	spdk_json_write_named_string(w, "key", hexkey);
1494 	if (hexkey2) {
1495 		spdk_json_write_named_string(w, "key2", hexkey2);
1496 	}
1497 	spdk_json_write_named_string(w, "cipher", crypto_bdev->opts->cipher);
1498 	spdk_json_write_object_end(w);
1499 out_err:
1500 	if (hexkey) {
1501 		memset(hexkey, 0, strlen(hexkey));
1502 		free(hexkey);
1503 	}
1504 	if (hexkey2) {
1505 		memset(hexkey2, 0, strlen(hexkey2));
1506 		free(hexkey2);
1507 	}
1508 	return rc;
1509 }
1510 
1511 static int
1512 vbdev_crypto_config_json(struct spdk_json_write_ctx *w)
1513 {
1514 	struct vbdev_crypto *crypto_bdev;
1515 
1516 	TAILQ_FOREACH(crypto_bdev, &g_vbdev_crypto, link) {
1517 		char *hexkey = NULL, *hexkey2 = NULL;
1518 
1519 		hexkey = hexlify(crypto_bdev->opts->key,
1520 				 crypto_bdev->opts->key_size);
1521 		if (!hexkey) {
1522 			return -ENOMEM;
1523 		}
1524 
1525 		if (crypto_bdev->opts->key2) {
1526 			hexkey2 = hexlify(crypto_bdev->opts->key2,
1527 					  crypto_bdev->opts->key2_size);
1528 			if (!hexkey2) {
1529 				memset(hexkey, 0, strlen(hexkey));
1530 				free(hexkey);
1531 				return -ENOMEM;
1532 			}
1533 		}
1534 
1535 		spdk_json_write_object_begin(w);
1536 		spdk_json_write_named_string(w, "method", "bdev_crypto_create");
1537 		spdk_json_write_named_object_begin(w, "params");
1538 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(crypto_bdev->base_bdev));
1539 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&crypto_bdev->crypto_bdev));
1540 		spdk_json_write_named_string(w, "crypto_pmd", crypto_bdev->opts->drv_name);
1541 		spdk_json_write_named_string(w, "key", hexkey);
1542 		if (hexkey2) {
1543 			spdk_json_write_named_string(w, "key2", hexkey2);
1544 		}
1545 		spdk_json_write_named_string(w, "cipher", crypto_bdev->opts->cipher);
1546 		spdk_json_write_object_end(w);
1547 		spdk_json_write_object_end(w);
1548 
1549 		if (hexkey) {
1550 			memset(hexkey, 0, strlen(hexkey));
1551 			free(hexkey);
1552 		}
1553 		if (hexkey2) {
1554 			memset(hexkey2, 0, strlen(hexkey2));
1555 			free(hexkey2);
1556 		}
1557 	}
1558 	return 0;
1559 }
1560 
1561 /* Helper function for the channel creation callback. */
1562 static void
1563 _assign_device_qp(struct vbdev_crypto *crypto_bdev, struct device_qp *device_qp,
1564 		  struct crypto_io_channel *crypto_ch)
1565 {
1566 	pthread_mutex_lock(&g_device_qp_lock);
1567 	if (strcmp(crypto_bdev->opts->drv_name, QAT) == 0) {
1568 		/* For some QAT devices, the optimal qp to use is every 32nd as this spreads the
1569 		 * workload out over the multiple virtual functions in the device. For the devices
1570 		 * where this isn't the case, it doesn't hurt.
1571 		 */
1572 		TAILQ_FOREACH(device_qp, &g_device_qp_qat, link) {
1573 			if (device_qp->index != g_next_qat_index) {
1574 				continue;
1575 			}
1576 			if (device_qp->in_use == false) {
1577 				crypto_ch->device_qp = device_qp;
1578 				device_qp->in_use = true;
1579 				g_next_qat_index = (g_next_qat_index + QAT_VF_SPREAD) % g_qat_total_qp;
1580 				break;
1581 			} else {
1582 				/* if the preferred index is used, skip to the next one in this set. */
1583 				g_next_qat_index = (g_next_qat_index + 1) % g_qat_total_qp;
1584 			}
1585 		}
1586 	} else if (strcmp(crypto_bdev->opts->drv_name, AESNI_MB) == 0) {
1587 		TAILQ_FOREACH(device_qp, &g_device_qp_aesni_mb, link) {
1588 			if (device_qp->in_use == false) {
1589 				crypto_ch->device_qp = device_qp;
1590 				device_qp->in_use = true;
1591 				break;
1592 			}
1593 		}
1594 	} else if (strcmp(crypto_bdev->opts->drv_name, MLX5) == 0) {
1595 		TAILQ_FOREACH(device_qp, &g_device_qp_mlx5, link) {
1596 			if (device_qp->in_use == false) {
1597 				crypto_ch->device_qp = device_qp;
1598 				device_qp->in_use = true;
1599 				break;
1600 			}
1601 		}
1602 	}
1603 	pthread_mutex_unlock(&g_device_qp_lock);
1604 }
1605 
1606 /* We provide this callback for the SPDK channel code to create a channel using
1607  * the channel struct we provided in our module get_io_channel() entry point. Here
1608  * we get and save off an underlying base channel of the device below us so that
1609  * we can communicate with the base bdev on a per channel basis. We also register the
1610  * poller used to complete crypto operations from the device.
1611  */
1612 static int
1613 crypto_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1614 {
1615 	struct crypto_io_channel *crypto_ch = ctx_buf;
1616 	struct vbdev_crypto *crypto_bdev = io_device;
1617 	struct device_qp *device_qp = NULL;
1618 
1619 	crypto_ch->base_ch = spdk_bdev_get_io_channel(crypto_bdev->base_desc);
1620 	crypto_ch->poller = SPDK_POLLER_REGISTER(crypto_dev_poller, crypto_ch, 0);
1621 	crypto_ch->device_qp = NULL;
1622 
1623 	/* Assign a device/qp combination that is unique per channel per PMD. */
1624 	_assign_device_qp(crypto_bdev, device_qp, crypto_ch);
1625 	assert(crypto_ch->device_qp);
1626 
1627 	/* We use this queue to track outstanding IO in our layer. */
1628 	TAILQ_INIT(&crypto_ch->pending_cry_ios);
1629 
1630 	/* We use this to queue up crypto ops when the device is busy. */
1631 	TAILQ_INIT(&crypto_ch->queued_cry_ops);
1632 
1633 	return 0;
1634 }
1635 
1636 /* We provide this callback for the SPDK channel code to destroy a channel
1637  * created with our create callback. We just need to undo anything we did
1638  * when we created.
1639  */
1640 static void
1641 crypto_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1642 {
1643 	struct crypto_io_channel *crypto_ch = ctx_buf;
1644 
1645 	pthread_mutex_lock(&g_device_qp_lock);
1646 	crypto_ch->device_qp->in_use = false;
1647 	pthread_mutex_unlock(&g_device_qp_lock);
1648 
1649 	spdk_poller_unregister(&crypto_ch->poller);
1650 	spdk_put_io_channel(crypto_ch->base_ch);
1651 }
1652 
1653 /* Create the association from the bdev and vbdev name and insert
1654  * on the global list. */
1655 static int
1656 vbdev_crypto_insert_name(struct vbdev_crypto_opts *opts, struct bdev_names **out)
1657 {
1658 	struct bdev_names *name;
1659 	bool found = false;
1660 	int j;
1661 
1662 	assert(opts);
1663 	assert(out);
1664 
1665 	TAILQ_FOREACH(name, &g_bdev_names, link) {
1666 		if (strcmp(opts->vbdev_name, name->opts->vbdev_name) == 0) {
1667 			SPDK_ERRLOG("Crypto bdev %s already exists\n", opts->vbdev_name);
1668 			return -EEXIST;
1669 		}
1670 	}
1671 
1672 	for (j = 0; j < MAX_NUM_DRV_TYPES ; j++) {
1673 		if (strcmp(opts->drv_name, g_driver_names[j]) == 0) {
1674 			found = true;
1675 			break;
1676 		}
1677 	}
1678 	if (!found) {
1679 		SPDK_ERRLOG("Crypto PMD type %s is not supported.\n", opts->drv_name);
1680 		return -EINVAL;
1681 	}
1682 
1683 	name = calloc(1, sizeof(struct bdev_names));
1684 	if (!name) {
1685 		SPDK_ERRLOG("Failed to allocate memory for bdev_names.\n");
1686 		return -ENOMEM;
1687 	}
1688 
1689 	name->opts = opts;
1690 	TAILQ_INSERT_TAIL(&g_bdev_names, name, link);
1691 	*out = name;
1692 
1693 	return 0;
1694 }
1695 
1696 void
1697 free_crypto_opts(struct vbdev_crypto_opts *opts)
1698 {
1699 	free(opts->bdev_name);
1700 	free(opts->vbdev_name);
1701 	free(opts->drv_name);
1702 	if (opts->xts_key) {
1703 		memset(opts->xts_key, 0,
1704 		       opts->key_size + opts->key2_size);
1705 		free(opts->xts_key);
1706 	}
1707 	memset(opts->key, 0, opts->key_size);
1708 	free(opts->key);
1709 	opts->key_size = 0;
1710 	if (opts->key2) {
1711 		memset(opts->key2, 0, opts->key2_size);
1712 		free(opts->key2);
1713 	}
1714 	opts->key2_size = 0;
1715 	free(opts);
1716 }
1717 
1718 static void
1719 vbdev_crypto_delete_name(struct bdev_names *name)
1720 {
1721 	TAILQ_REMOVE(&g_bdev_names, name, link);
1722 	if (name->opts) {
1723 		free_crypto_opts(name->opts);
1724 		name->opts = NULL;
1725 	}
1726 	free(name);
1727 }
1728 
1729 /* RPC entry point for crypto creation. */
1730 int
1731 create_crypto_disk(struct vbdev_crypto_opts *opts)
1732 {
1733 	struct bdev_names *name = NULL;
1734 	int rc;
1735 
1736 	rc = vbdev_crypto_insert_name(opts, &name);
1737 	if (rc) {
1738 		return rc;
1739 	}
1740 
1741 	rc = vbdev_crypto_claim(opts->bdev_name);
1742 	if (rc == -ENODEV) {
1743 		SPDK_NOTICELOG("vbdev creation deferred pending base bdev arrival\n");
1744 		rc = 0;
1745 	}
1746 
1747 	if (rc) {
1748 		assert(name != NULL);
1749 		/* In case of error we let the caller function to deallocate @opts
1750 		 * since it is its responsibiltiy. Setting name->opts = NULL let's
1751 		 * vbdev_crypto_delete_name() know it does not have to do anything
1752 		 * about @opts.
1753 		 */
1754 		name->opts = NULL;
1755 		vbdev_crypto_delete_name(name);
1756 	}
1757 	return rc;
1758 }
1759 
1760 /* Called at driver init time, parses config file to prepare for examine calls,
1761  * also fully initializes the crypto drivers.
1762  */
1763 static int
1764 vbdev_crypto_init(void)
1765 {
1766 	int rc = 0;
1767 
1768 	/* Fully configure both SW and HW drivers. */
1769 	rc = vbdev_crypto_init_crypto_drivers();
1770 	if (rc) {
1771 		SPDK_ERRLOG("Error setting up crypto devices\n");
1772 	}
1773 
1774 	return rc;
1775 }
1776 
1777 /* Called when the entire module is being torn down. */
1778 static void
1779 vbdev_crypto_finish(void)
1780 {
1781 	struct bdev_names *name;
1782 	struct vbdev_dev *device;
1783 
1784 	while ((name = TAILQ_FIRST(&g_bdev_names))) {
1785 		vbdev_crypto_delete_name(name);
1786 	}
1787 
1788 	while ((device = TAILQ_FIRST(&g_vbdev_devs))) {
1789 		TAILQ_REMOVE(&g_vbdev_devs, device, link);
1790 		release_vbdev_dev(device);
1791 	}
1792 	rte_vdev_uninit(AESNI_MB);
1793 
1794 	/* These are removed in release_vbdev_dev() */
1795 	assert(TAILQ_EMPTY(&g_device_qp_qat));
1796 	assert(TAILQ_EMPTY(&g_device_qp_aesni_mb));
1797 	assert(TAILQ_EMPTY(&g_device_qp_mlx5));
1798 
1799 	rte_mempool_free(g_crypto_op_mp);
1800 	rte_mempool_free(g_mbuf_mp);
1801 	rte_mempool_free(g_session_mp);
1802 	if (g_session_mp_priv != NULL) {
1803 		rte_mempool_free(g_session_mp_priv);
1804 	}
1805 }
1806 
1807 /* During init we'll be asked how much memory we'd like passed to us
1808  * in bev_io structures as context. Here's where we specify how
1809  * much context we want per IO.
1810  */
1811 static int
1812 vbdev_crypto_get_ctx_size(void)
1813 {
1814 	return sizeof(struct crypto_bdev_io);
1815 }
1816 
1817 static void
1818 vbdev_crypto_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1819 {
1820 	struct vbdev_crypto *crypto_bdev, *tmp;
1821 
1822 	TAILQ_FOREACH_SAFE(crypto_bdev, &g_vbdev_crypto, link, tmp) {
1823 		if (bdev_find == crypto_bdev->base_bdev) {
1824 			spdk_bdev_unregister(&crypto_bdev->crypto_bdev, NULL, NULL);
1825 		}
1826 	}
1827 }
1828 
1829 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1830 static void
1831 vbdev_crypto_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1832 				void *event_ctx)
1833 {
1834 	switch (type) {
1835 	case SPDK_BDEV_EVENT_REMOVE:
1836 		vbdev_crypto_base_bdev_hotremove_cb(bdev);
1837 		break;
1838 	default:
1839 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1840 		break;
1841 	}
1842 }
1843 
1844 static void
1845 vbdev_crypto_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1846 {
1847 	/* No config per bdev needed */
1848 }
1849 
1850 /* When we register our bdev this is how we specify our entry points. */
1851 static const struct spdk_bdev_fn_table vbdev_crypto_fn_table = {
1852 	.destruct		= vbdev_crypto_destruct,
1853 	.submit_request		= vbdev_crypto_submit_request,
1854 	.io_type_supported	= vbdev_crypto_io_type_supported,
1855 	.get_io_channel		= vbdev_crypto_get_io_channel,
1856 	.dump_info_json		= vbdev_crypto_dump_info_json,
1857 	.write_config_json	= vbdev_crypto_write_config_json
1858 };
1859 
1860 static struct spdk_bdev_module crypto_if = {
1861 	.name = "crypto",
1862 	.module_init = vbdev_crypto_init,
1863 	.get_ctx_size = vbdev_crypto_get_ctx_size,
1864 	.examine_config = vbdev_crypto_examine,
1865 	.module_fini = vbdev_crypto_finish,
1866 	.config_json = vbdev_crypto_config_json
1867 };
1868 
1869 SPDK_BDEV_MODULE_REGISTER(crypto, &crypto_if)
1870 
1871 static int
1872 vbdev_crypto_claim(const char *bdev_name)
1873 {
1874 	struct bdev_names *name;
1875 	struct vbdev_crypto *vbdev;
1876 	struct vbdev_dev *device;
1877 	struct spdk_bdev *bdev;
1878 	bool found = false;
1879 	uint8_t key_size;
1880 	int rc = 0;
1881 
1882 	if (g_number_of_claimed_volumes >= MAX_CRYPTO_VOLUMES) {
1883 		SPDK_DEBUGLOG(vbdev_crypto, "Reached max number of claimed volumes\n");
1884 		return -EINVAL;
1885 	}
1886 	g_number_of_claimed_volumes++;
1887 
1888 	/* Check our list of names from config versus this bdev and if
1889 	 * there's a match, create the crypto_bdev & bdev accordingly.
1890 	 */
1891 	TAILQ_FOREACH(name, &g_bdev_names, link) {
1892 		if (strcmp(name->opts->bdev_name, bdev_name) != 0) {
1893 			continue;
1894 		}
1895 		SPDK_DEBUGLOG(vbdev_crypto, "Match on %s\n", bdev_name);
1896 
1897 		vbdev = calloc(1, sizeof(struct vbdev_crypto));
1898 		if (!vbdev) {
1899 			SPDK_ERRLOG("Failed to allocate memory for crypto_bdev.\n");
1900 			rc = -ENOMEM;
1901 			goto error_vbdev_alloc;
1902 		}
1903 		vbdev->crypto_bdev.product_name = "crypto";
1904 
1905 		vbdev->crypto_bdev.name = strdup(name->opts->vbdev_name);
1906 		if (!vbdev->crypto_bdev.name) {
1907 			SPDK_ERRLOG("Failed to allocate memory for crypto_bdev name.\n");
1908 			rc = -ENOMEM;
1909 			goto error_bdev_name;
1910 		}
1911 
1912 		rc = spdk_bdev_open_ext(bdev_name, true, vbdev_crypto_base_bdev_event_cb,
1913 					NULL, &vbdev->base_desc);
1914 		if (rc) {
1915 			if (rc != -ENODEV) {
1916 				SPDK_ERRLOG("Failed to open bdev %s: error %d\n", bdev_name, rc);
1917 			}
1918 			goto error_open;
1919 		}
1920 
1921 		bdev = spdk_bdev_desc_get_bdev(vbdev->base_desc);
1922 		vbdev->base_bdev = bdev;
1923 
1924 		if (strcmp(name->opts->drv_name, MLX5) == 0) {
1925 			vbdev->qp_desc_nr = CRYPTO_QP_DESCRIPTORS_MLX5;
1926 		} else {
1927 			vbdev->qp_desc_nr = CRYPTO_QP_DESCRIPTORS;
1928 		}
1929 
1930 		vbdev->crypto_bdev.write_cache = bdev->write_cache;
1931 		if (strcmp(name->opts->drv_name, QAT) == 0) {
1932 			vbdev->crypto_bdev.required_alignment =
1933 				spdk_max(spdk_u32log2(bdev->blocklen), bdev->required_alignment);
1934 			SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1935 				       vbdev->crypto_bdev.required_alignment);
1936 			SPDK_NOTICELOG("QAT using cipher: %s\n", name->opts->cipher);
1937 		} else if (strcmp(name->opts->drv_name, MLX5) == 0) {
1938 			vbdev->crypto_bdev.required_alignment = bdev->required_alignment;
1939 			SPDK_NOTICELOG("MLX5 using cipher: %s\n", name->opts->cipher);
1940 		} else {
1941 			vbdev->crypto_bdev.required_alignment = bdev->required_alignment;
1942 			SPDK_NOTICELOG("AESNI_MB using cipher: %s\n", name->opts->cipher);
1943 		}
1944 		vbdev->cipher_xform.cipher.iv.length = IV_LENGTH;
1945 
1946 		/* Note: CRYPTO_MAX_IO is in units of bytes, optimal_io_boundary is
1947 		 * in units of blocks.
1948 		 */
1949 		if (bdev->optimal_io_boundary > 0) {
1950 			vbdev->crypto_bdev.optimal_io_boundary =
1951 				spdk_min((CRYPTO_MAX_IO / bdev->blocklen), bdev->optimal_io_boundary);
1952 		} else {
1953 			vbdev->crypto_bdev.optimal_io_boundary = (CRYPTO_MAX_IO / bdev->blocklen);
1954 		}
1955 		vbdev->crypto_bdev.split_on_optimal_io_boundary = true;
1956 		vbdev->crypto_bdev.blocklen = bdev->blocklen;
1957 		vbdev->crypto_bdev.blockcnt = bdev->blockcnt;
1958 
1959 		/* This is the context that is passed to us when the bdev
1960 		 * layer calls in so we'll save our crypto_bdev node here.
1961 		 */
1962 		vbdev->crypto_bdev.ctxt = vbdev;
1963 		vbdev->crypto_bdev.fn_table = &vbdev_crypto_fn_table;
1964 		vbdev->crypto_bdev.module = &crypto_if;
1965 
1966 		/* Assign crypto opts from the name. The pointer is valid up to the point
1967 		 * the module is unloaded and all names removed from the list. */
1968 		vbdev->opts = name->opts;
1969 
1970 		TAILQ_INSERT_TAIL(&g_vbdev_crypto, vbdev, link);
1971 
1972 		spdk_io_device_register(vbdev, crypto_bdev_ch_create_cb, crypto_bdev_ch_destroy_cb,
1973 					sizeof(struct crypto_io_channel), vbdev->crypto_bdev.name);
1974 
1975 		/* Save the thread where the base device is opened */
1976 		vbdev->thread = spdk_get_thread();
1977 
1978 		rc = spdk_bdev_module_claim_bdev(bdev, vbdev->base_desc, vbdev->crypto_bdev.module);
1979 		if (rc) {
1980 			SPDK_ERRLOG("Failed to claim bdev %s\n", spdk_bdev_get_name(bdev));
1981 			goto error_claim;
1982 		}
1983 
1984 		/* To init the session we have to get the cryptoDev device ID for this vbdev */
1985 		TAILQ_FOREACH(device, &g_vbdev_devs, link) {
1986 			if (strcmp(device->cdev_info.driver_name, vbdev->opts->drv_name) == 0) {
1987 				found = true;
1988 				break;
1989 			}
1990 		}
1991 		if (found == false) {
1992 			SPDK_ERRLOG("Failed to match crypto device driver to crypto vbdev.\n");
1993 			rc = -EINVAL;
1994 			goto error_cant_find_devid;
1995 		}
1996 
1997 		/* Get sessions. */
1998 		vbdev->session_encrypt = rte_cryptodev_sym_session_create(g_session_mp);
1999 		if (NULL == vbdev->session_encrypt) {
2000 			SPDK_ERRLOG("Failed to create encrypt crypto session.\n");
2001 			rc = -EINVAL;
2002 			goto error_session_en_create;
2003 		}
2004 
2005 		vbdev->session_decrypt = rte_cryptodev_sym_session_create(g_session_mp);
2006 		if (NULL == vbdev->session_decrypt) {
2007 			SPDK_ERRLOG("Failed to create decrypt crypto session.\n");
2008 			rc = -EINVAL;
2009 			goto error_session_de_create;
2010 		}
2011 
2012 		/* Init our per vbdev xform with the desired cipher options. */
2013 		vbdev->cipher_xform.type = RTE_CRYPTO_SYM_XFORM_CIPHER;
2014 		vbdev->cipher_xform.cipher.iv.offset = IV_OFFSET;
2015 		if (strcmp(vbdev->opts->cipher, AES_CBC) == 0) {
2016 			vbdev->cipher_xform.cipher.key.data = vbdev->opts->key;
2017 			vbdev->cipher_xform.cipher.key.length = vbdev->opts->key_size;
2018 			vbdev->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_CBC;
2019 		} else if (strcmp(vbdev->opts->cipher, AES_XTS) == 0) {
2020 			key_size = vbdev->opts->key_size + vbdev->opts->key2_size;
2021 			vbdev->cipher_xform.cipher.key.data = vbdev->opts->xts_key;
2022 			vbdev->cipher_xform.cipher.key.length = key_size;
2023 			vbdev->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_XTS;
2024 		} else {
2025 			SPDK_ERRLOG("Invalid cipher name %s.\n", vbdev->opts->cipher);
2026 			rc = -EINVAL;
2027 			goto error_session_de_create;
2028 		}
2029 		vbdev->cipher_xform.cipher.iv.length = IV_LENGTH;
2030 
2031 		vbdev->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_ENCRYPT;
2032 		rc = rte_cryptodev_sym_session_init(device->cdev_id, vbdev->session_encrypt,
2033 						    &vbdev->cipher_xform,
2034 						    g_session_mp_priv ? g_session_mp_priv : g_session_mp);
2035 		if (rc < 0) {
2036 			SPDK_ERRLOG("Failed to init encrypt session: error %d\n", rc);
2037 			rc = -EINVAL;
2038 			goto error_session_init;
2039 		}
2040 
2041 		vbdev->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_DECRYPT;
2042 		rc = rte_cryptodev_sym_session_init(device->cdev_id, vbdev->session_decrypt,
2043 						    &vbdev->cipher_xform,
2044 						    g_session_mp_priv ? g_session_mp_priv : g_session_mp);
2045 		if (rc < 0) {
2046 			SPDK_ERRLOG("Failed to init decrypt session: error %d\n", rc);
2047 			rc = -EINVAL;
2048 			goto error_session_init;
2049 		}
2050 
2051 		rc = spdk_bdev_register(&vbdev->crypto_bdev);
2052 		if (rc < 0) {
2053 			SPDK_ERRLOG("Failed to register vbdev: error %d\n", rc);
2054 			rc = -EINVAL;
2055 			goto error_bdev_register;
2056 		}
2057 		SPDK_DEBUGLOG(vbdev_crypto, "Registered io_device and virtual bdev for: %s\n",
2058 			      vbdev->opts->vbdev_name);
2059 		break;
2060 	}
2061 
2062 	return rc;
2063 
2064 	/* Error cleanup paths. */
2065 error_bdev_register:
2066 error_session_init:
2067 	rte_cryptodev_sym_session_free(vbdev->session_decrypt);
2068 error_session_de_create:
2069 	rte_cryptodev_sym_session_free(vbdev->session_encrypt);
2070 error_session_en_create:
2071 error_cant_find_devid:
2072 	spdk_bdev_module_release_bdev(vbdev->base_bdev);
2073 error_claim:
2074 	TAILQ_REMOVE(&g_vbdev_crypto, vbdev, link);
2075 	spdk_io_device_unregister(vbdev, NULL);
2076 	spdk_bdev_close(vbdev->base_desc);
2077 error_open:
2078 	free(vbdev->crypto_bdev.name);
2079 error_bdev_name:
2080 	free(vbdev);
2081 error_vbdev_alloc:
2082 	g_number_of_claimed_volumes--;
2083 	return rc;
2084 }
2085 
2086 /* RPC entry for deleting a crypto vbdev. */
2087 void
2088 delete_crypto_disk(const char *bdev_name, spdk_delete_crypto_complete cb_fn,
2089 		   void *cb_arg)
2090 {
2091 	struct bdev_names *name;
2092 	int rc;
2093 
2094 	/* Some cleanup happens in the destruct callback. */
2095 	rc = spdk_bdev_unregister_by_name(bdev_name, &crypto_if, cb_fn, cb_arg);
2096 	if (rc == 0) {
2097 		/* Remove the association (vbdev, bdev) from g_bdev_names. This is required so that the
2098 		 * vbdev does not get re-created if the same bdev is constructed at some other time,
2099 		 * unless the underlying bdev was hot-removed.
2100 		 */
2101 		TAILQ_FOREACH(name, &g_bdev_names, link) {
2102 			if (strcmp(name->opts->vbdev_name, bdev_name) == 0) {
2103 				vbdev_crypto_delete_name(name);
2104 				break;
2105 			}
2106 		}
2107 	} else {
2108 		cb_fn(cb_arg, rc);
2109 	}
2110 }
2111 
2112 /* Because we specified this function in our crypto bdev function table when we
2113  * registered our crypto bdev, we'll get this call anytime a new bdev shows up.
2114  * Here we need to decide if we care about it and if so what to do. We
2115  * parsed the config file at init so we check the new bdev against the list
2116  * we built up at that time and if the user configured us to attach to this
2117  * bdev, here's where we do it.
2118  */
2119 static void
2120 vbdev_crypto_examine(struct spdk_bdev *bdev)
2121 {
2122 	vbdev_crypto_claim(spdk_bdev_get_name(bdev));
2123 	spdk_bdev_module_examine_done(&crypto_if);
2124 }
2125 
2126 SPDK_LOG_REGISTER_COMPONENT(vbdev_crypto)
2127