xref: /spdk/module/bdev/raid/bdev_raid.c (revision 0830da3347f61d177686868b29894fe66c4875a0)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
244 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
245 		/*
246 		 * Get the spdk_io_channel for all the base bdevs. This is used during
247 		 * split logic to send the respective child bdev ios to respective base
248 		 * bdev io channel.
249 		 * Skip missing base bdevs and the process target, which should also be treated as
250 		 * missing until the process completes.
251 		 */
252 		if (raid_bdev->base_bdev_info[i].desc == NULL ||
253 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
254 			continue;
255 		}
256 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
257 						   raid_bdev->base_bdev_info[i].desc);
258 		if (!raid_ch->base_channel[i]) {
259 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
260 			goto err;
261 		}
262 	}
263 
264 	if (raid_bdev->module->get_io_channel) {
265 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
266 		if (!raid_ch->module_channel) {
267 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
268 			goto err;
269 		}
270 	}
271 
272 	if (raid_bdev->process != NULL) {
273 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
274 		if (ret != 0) {
275 			SPDK_ERRLOG("Failed to setup process io channel\n");
276 			goto err;
277 		}
278 	} else {
279 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
280 	}
281 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
282 
283 	return 0;
284 err:
285 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
286 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
287 		if (raid_ch->base_channel[i] != NULL) {
288 			spdk_put_io_channel(raid_ch->base_channel[i]);
289 		}
290 	}
291 	free(raid_ch->base_channel);
292 
293 	raid_bdev_ch_process_cleanup(raid_ch);
294 
295 	return ret;
296 }
297 
298 /*
299  * brief:
300  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
301  * hierarchy from raid bdev to base bdev io channels. It will be called per core
302  * params:
303  * io_device - pointer to raid bdev io device represented by raid_bdev
304  * ctx_buf - pointer to context buffer for raid bdev io channel
305  * returns:
306  * none
307  */
308 static void
309 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
310 {
311 	struct raid_bdev *raid_bdev = io_device;
312 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
313 	uint8_t i;
314 
315 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
316 
317 	assert(raid_ch != NULL);
318 	assert(raid_ch->base_channel);
319 
320 	if (raid_ch->module_channel) {
321 		spdk_put_io_channel(raid_ch->module_channel);
322 	}
323 
324 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
325 		/* Free base bdev channels */
326 		if (raid_ch->base_channel[i] != NULL) {
327 			spdk_put_io_channel(raid_ch->base_channel[i]);
328 		}
329 	}
330 	free(raid_ch->base_channel);
331 	raid_ch->base_channel = NULL;
332 
333 	raid_bdev_ch_process_cleanup(raid_ch);
334 }
335 
336 /*
337  * brief:
338  * raid_bdev_cleanup is used to cleanup raid_bdev related data
339  * structures.
340  * params:
341  * raid_bdev - pointer to raid_bdev
342  * returns:
343  * none
344  */
345 static void
346 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
347 {
348 	struct raid_base_bdev_info *base_info;
349 
350 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
351 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
352 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
353 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
354 
355 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
356 		assert(base_info->desc == NULL);
357 		free(base_info->name);
358 	}
359 
360 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
361 }
362 
363 static void
364 raid_bdev_free(struct raid_bdev *raid_bdev)
365 {
366 	raid_bdev_free_superblock(raid_bdev);
367 	spdk_spin_destroy(&raid_bdev->base_bdev_lock);
368 	free(raid_bdev->base_bdev_info);
369 	free(raid_bdev->bdev.name);
370 	free(raid_bdev);
371 }
372 
373 static void
374 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
375 {
376 	raid_bdev_cleanup(raid_bdev);
377 	raid_bdev_free(raid_bdev);
378 }
379 
380 /*
381  * brief:
382  * free resource of base bdev for raid bdev
383  * params:
384  * base_info - raid base bdev info
385  * returns:
386  * none
387  */
388 static void
389 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
390 {
391 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
392 
393 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
394 
395 	free(base_info->name);
396 	base_info->name = NULL;
397 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
398 		spdk_uuid_set_null(&base_info->uuid);
399 	}
400 
401 	if (base_info->desc == NULL) {
402 		return;
403 	}
404 
405 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
406 	spdk_bdev_close(base_info->desc);
407 	base_info->desc = NULL;
408 	spdk_put_io_channel(base_info->app_thread_ch);
409 	base_info->app_thread_ch = NULL;
410 
411 	if (base_info->is_configured) {
412 		assert(raid_bdev->num_base_bdevs_discovered);
413 		raid_bdev->num_base_bdevs_discovered--;
414 		base_info->is_configured = false;
415 	}
416 }
417 
418 static void
419 raid_bdev_io_device_unregister_cb(void *io_device)
420 {
421 	struct raid_bdev *raid_bdev = io_device;
422 
423 	if (raid_bdev->num_base_bdevs_discovered == 0) {
424 		/* Free raid_bdev when there are no base bdevs left */
425 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
426 		raid_bdev_cleanup(raid_bdev);
427 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
428 		raid_bdev_free(raid_bdev);
429 	} else {
430 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
431 	}
432 }
433 
434 void
435 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
436 {
437 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
438 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
439 	}
440 }
441 
442 static void
443 _raid_bdev_destruct(void *ctxt)
444 {
445 	struct raid_bdev *raid_bdev = ctxt;
446 	struct raid_base_bdev_info *base_info;
447 
448 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
449 
450 	assert(raid_bdev->process == NULL);
451 
452 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
453 		/*
454 		 * Close all base bdev descriptors for which call has come from below
455 		 * layers.  Also close the descriptors if we have started shutdown.
456 		 */
457 		if (g_shutdown_started || base_info->remove_scheduled == true) {
458 			raid_bdev_free_base_bdev_resource(base_info);
459 		}
460 	}
461 
462 	if (g_shutdown_started) {
463 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
464 	}
465 
466 	if (raid_bdev->module->stop != NULL) {
467 		if (raid_bdev->module->stop(raid_bdev) == false) {
468 			return;
469 		}
470 	}
471 
472 	raid_bdev_module_stop_done(raid_bdev);
473 }
474 
475 static int
476 raid_bdev_destruct(void *ctx)
477 {
478 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
479 
480 	return 1;
481 }
482 
483 static int
484 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
485 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
486 {
487 	struct spdk_dif_ctx dif_ctx;
488 	struct spdk_dif_error err_blk = {};
489 	int rc;
490 	struct spdk_dif_ctx_init_ext_opts dif_opts;
491 	struct iovec md_iov = {
492 		.iov_base	= md_buf,
493 		.iov_len	= num_blocks * bdev->md_len,
494 	};
495 
496 	if (md_buf == NULL) {
497 		return 0;
498 	}
499 
500 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
501 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
502 	rc = spdk_dif_ctx_init(&dif_ctx,
503 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
504 			       bdev->dif_is_head_of_md, bdev->dif_type,
505 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
506 			       0, 0, 0, 0, 0, &dif_opts);
507 	if (rc != 0) {
508 		SPDK_ERRLOG("Initialization of DIF context failed\n");
509 		return rc;
510 	}
511 
512 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
513 
514 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
515 	if (rc != 0) {
516 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
517 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
518 	}
519 
520 	return rc;
521 }
522 
523 int
524 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
525 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
526 {
527 	struct spdk_dif_ctx dif_ctx;
528 	struct spdk_dif_error err_blk = {};
529 	int rc;
530 	struct spdk_dif_ctx_init_ext_opts dif_opts;
531 	struct iovec md_iov = {
532 		.iov_base	= md_buf,
533 		.iov_len	= num_blocks * bdev->md_len,
534 	};
535 
536 	if (md_buf == NULL) {
537 		return 0;
538 	}
539 
540 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
541 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
542 	rc = spdk_dif_ctx_init(&dif_ctx,
543 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
544 			       bdev->dif_is_head_of_md, bdev->dif_type,
545 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
546 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
547 	if (rc != 0) {
548 		SPDK_ERRLOG("Initialization of DIF context failed\n");
549 		return rc;
550 	}
551 
552 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
553 	if (rc != 0) {
554 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
555 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
556 	}
557 
558 	return rc;
559 }
560 
561 /**
562  * Raid bdev I/O read/write wrapper for spdk_bdev_readv_blocks_ext function.
563  */
564 int
565 raid_bdev_readv_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
566 			   struct iovec *iov, int iovcnt, uint64_t offset_blocks,
567 			   uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
568 			   struct spdk_bdev_ext_io_opts *opts)
569 {
570 	return spdk_bdev_readv_blocks_ext(base_info->desc, ch, iov, iovcnt,
571 					  base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
572 }
573 
574 /**
575  * Raid bdev I/O read/write wrapper for spdk_bdev_writev_blocks_ext function.
576  */
577 int
578 raid_bdev_writev_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
579 			    struct iovec *iov, int iovcnt, uint64_t offset_blocks,
580 			    uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
581 			    struct spdk_bdev_ext_io_opts *opts)
582 {
583 	int rc;
584 	uint64_t remapped_offset_blocks = base_info->data_offset + offset_blocks;
585 
586 	if (spdk_unlikely(spdk_bdev_get_dif_type(&base_info->raid_bdev->bdev) != SPDK_DIF_DISABLE &&
587 			  base_info->raid_bdev->bdev.dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK)) {
588 
589 		rc = raid_bdev_remap_dix_reftag(opts->metadata, num_blocks, &base_info->raid_bdev->bdev,
590 						remapped_offset_blocks);
591 		if (rc != 0) {
592 			return rc;
593 		}
594 	}
595 
596 	return spdk_bdev_writev_blocks_ext(base_info->desc, ch, iov, iovcnt,
597 					   remapped_offset_blocks, num_blocks, cb, cb_arg, opts);
598 }
599 
600 void
601 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
602 {
603 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
604 	int rc;
605 
606 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
607 		struct iovec *split_iov = raid_io->split.iov;
608 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
609 
610 		/*
611 		 * Non-zero offset here means that this is the completion of the first part of the
612 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
613 		 */
614 		if (raid_io->split.offset != 0) {
615 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
616 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
617 
618 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
619 				raid_io->num_blocks = raid_io->split.offset;
620 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
621 				raid_io->iovs = bdev_io->u.bdev.iovs;
622 				if (split_iov != NULL) {
623 					raid_io->iovcnt++;
624 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
625 					split_iov->iov_base = split_iov_orig->iov_base;
626 				}
627 
628 				raid_io->split.offset = 0;
629 				raid_io->base_bdev_io_submitted = 0;
630 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
631 
632 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
633 				return;
634 			}
635 		}
636 
637 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
638 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
639 		raid_io->iovs = bdev_io->u.bdev.iovs;
640 		if (split_iov != NULL) {
641 			*split_iov = *split_iov_orig;
642 		}
643 	}
644 
645 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
646 		raid_io->completion_cb(raid_io, status);
647 	} else {
648 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
649 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
650 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
651 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
652 
653 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
654 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
655 							bdev_io->u.bdev.offset_blocks);
656 			if (rc != 0) {
657 				status = SPDK_BDEV_IO_STATUS_FAILED;
658 			}
659 		}
660 		spdk_bdev_io_complete(bdev_io, status);
661 	}
662 }
663 
664 /*
665  * brief:
666  * raid_bdev_io_complete_part - signal the completion of a part of the expected
667  * base bdev IOs and complete the raid_io if this is the final expected IO.
668  * The caller should first set raid_io->base_bdev_io_remaining. This function
669  * will decrement this counter by the value of the 'completed' parameter and
670  * complete the raid_io if the counter reaches 0. The caller is free to
671  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
672  * it can represent e.g. blocks or IOs.
673  * params:
674  * raid_io - pointer to raid_bdev_io
675  * completed - the part of the raid_io that has been completed
676  * status - status of the base IO
677  * returns:
678  * true - if the raid_io is completed
679  * false - otherwise
680  */
681 bool
682 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
683 			   enum spdk_bdev_io_status status)
684 {
685 	assert(raid_io->base_bdev_io_remaining >= completed);
686 	raid_io->base_bdev_io_remaining -= completed;
687 
688 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
689 		raid_io->base_bdev_io_status = status;
690 	}
691 
692 	if (raid_io->base_bdev_io_remaining == 0) {
693 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
694 		return true;
695 	} else {
696 		return false;
697 	}
698 }
699 
700 /*
701  * brief:
702  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
703  * It will try to queue the IOs after storing the context to bdev wait queue logic.
704  * params:
705  * raid_io - pointer to raid_bdev_io
706  * bdev - the block device that the IO is submitted to
707  * ch - io channel
708  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
709  * returns:
710  * none
711  */
712 void
713 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
714 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
715 {
716 	raid_io->waitq_entry.bdev = bdev;
717 	raid_io->waitq_entry.cb_fn = cb_fn;
718 	raid_io->waitq_entry.cb_arg = raid_io;
719 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
720 }
721 
722 static void
723 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
724 {
725 	struct raid_bdev_io *raid_io = cb_arg;
726 
727 	spdk_bdev_free_io(bdev_io);
728 
729 	raid_bdev_io_complete_part(raid_io, 1, success ?
730 				   SPDK_BDEV_IO_STATUS_SUCCESS :
731 				   SPDK_BDEV_IO_STATUS_FAILED);
732 }
733 
734 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
735 
736 static void
737 _raid_bdev_submit_reset_request(void *_raid_io)
738 {
739 	struct raid_bdev_io *raid_io = _raid_io;
740 
741 	raid_bdev_submit_reset_request(raid_io);
742 }
743 
744 /*
745  * brief:
746  * raid_bdev_submit_reset_request function submits reset requests
747  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
748  * which case it will queue it for later submission
749  * params:
750  * raid_io
751  * returns:
752  * none
753  */
754 static void
755 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
756 {
757 	struct raid_bdev		*raid_bdev;
758 	int				ret;
759 	uint8_t				i;
760 	struct raid_base_bdev_info	*base_info;
761 	struct spdk_io_channel		*base_ch;
762 
763 	raid_bdev = raid_io->raid_bdev;
764 
765 	if (raid_io->base_bdev_io_remaining == 0) {
766 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
767 	}
768 
769 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
770 		base_info = &raid_bdev->base_bdev_info[i];
771 		base_ch = raid_io->raid_ch->base_channel[i];
772 		if (base_ch == NULL) {
773 			raid_io->base_bdev_io_submitted++;
774 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
775 			continue;
776 		}
777 		ret = spdk_bdev_reset(base_info->desc, base_ch,
778 				      raid_base_bdev_reset_complete, raid_io);
779 		if (ret == 0) {
780 			raid_io->base_bdev_io_submitted++;
781 		} else if (ret == -ENOMEM) {
782 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
783 						base_ch, _raid_bdev_submit_reset_request);
784 			return;
785 		} else {
786 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
787 			assert(false);
788 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
789 			return;
790 		}
791 	}
792 }
793 
794 static void
795 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
796 {
797 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
798 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
799 	int i;
800 
801 	assert(split_offset != 0);
802 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
803 	raid_io->split.offset = split_offset;
804 
805 	raid_io->offset_blocks += split_offset;
806 	raid_io->num_blocks -= split_offset;
807 	if (raid_io->md_buf != NULL) {
808 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
809 	}
810 
811 	for (i = 0; i < raid_io->iovcnt; i++) {
812 		struct iovec *iov = &raid_io->iovs[i];
813 
814 		if (iov_offset < iov->iov_len) {
815 			if (iov_offset == 0) {
816 				raid_io->split.iov = NULL;
817 			} else {
818 				raid_io->split.iov = iov;
819 				raid_io->split.iov_copy = *iov;
820 				iov->iov_base += iov_offset;
821 				iov->iov_len -= iov_offset;
822 			}
823 			raid_io->iovs += i;
824 			raid_io->iovcnt -= i;
825 			break;
826 		}
827 
828 		iov_offset -= iov->iov_len;
829 	}
830 }
831 
832 static void
833 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
834 {
835 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
836 
837 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
838 		uint64_t offset_begin = raid_io->offset_blocks;
839 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
840 
841 		if (offset_end > raid_ch->process.offset) {
842 			if (offset_begin < raid_ch->process.offset) {
843 				/*
844 				 * If the I/O spans both the processed and unprocessed ranges,
845 				 * split it and first handle the unprocessed part. After it
846 				 * completes, the rest will be handled.
847 				 * This situation occurs when the process thread is not active
848 				 * or is waiting for the process window range to be locked
849 				 * (quiesced). When a window is being processed, such I/Os will be
850 				 * deferred by the bdev layer until the window is unlocked.
851 				 */
852 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
853 					      raid_ch->process.offset, offset_begin, offset_end);
854 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
855 			}
856 		} else {
857 			/* Use the child channel, which corresponds to the already processed range */
858 			raid_io->raid_ch = raid_ch->process.ch_processed;
859 		}
860 	}
861 
862 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
863 }
864 
865 /*
866  * brief:
867  * Callback function to spdk_bdev_io_get_buf.
868  * params:
869  * ch - pointer to raid bdev io channel
870  * bdev_io - pointer to parent bdev_io on raid bdev device
871  * success - True if buffer is allocated or false otherwise.
872  * returns:
873  * none
874  */
875 static void
876 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
877 		     bool success)
878 {
879 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
880 
881 	if (!success) {
882 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
883 		return;
884 	}
885 
886 	raid_bdev_submit_rw_request(raid_io);
887 }
888 
889 void
890 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
891 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
892 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
893 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
894 {
895 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
896 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
897 
898 	raid_io->type = type;
899 	raid_io->offset_blocks = offset_blocks;
900 	raid_io->num_blocks = num_blocks;
901 	raid_io->iovs = iovs;
902 	raid_io->iovcnt = iovcnt;
903 	raid_io->memory_domain = memory_domain;
904 	raid_io->memory_domain_ctx = memory_domain_ctx;
905 	raid_io->md_buf = md_buf;
906 
907 	raid_io->raid_bdev = raid_bdev;
908 	raid_io->raid_ch = raid_ch;
909 	raid_io->base_bdev_io_remaining = 0;
910 	raid_io->base_bdev_io_submitted = 0;
911 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
912 	raid_io->completion_cb = NULL;
913 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
914 }
915 
916 /*
917  * brief:
918  * raid_bdev_submit_request function is the submit_request function pointer of
919  * raid bdev function table. This is used to submit the io on raid_bdev to below
920  * layers.
921  * params:
922  * ch - pointer to raid bdev io channel
923  * bdev_io - pointer to parent bdev_io on raid bdev device
924  * returns:
925  * none
926  */
927 static void
928 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
929 {
930 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
931 
932 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
933 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
934 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
935 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
936 
937 	switch (bdev_io->type) {
938 	case SPDK_BDEV_IO_TYPE_READ:
939 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
940 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
941 		break;
942 	case SPDK_BDEV_IO_TYPE_WRITE:
943 		raid_bdev_submit_rw_request(raid_io);
944 		break;
945 
946 	case SPDK_BDEV_IO_TYPE_RESET:
947 		raid_bdev_submit_reset_request(raid_io);
948 		break;
949 
950 	case SPDK_BDEV_IO_TYPE_FLUSH:
951 	case SPDK_BDEV_IO_TYPE_UNMAP:
952 		if (raid_io->raid_bdev->process != NULL) {
953 			/* TODO: rebuild support */
954 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
955 			return;
956 		}
957 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
958 		break;
959 
960 	default:
961 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
962 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
963 		break;
964 	}
965 }
966 
967 /*
968  * brief:
969  * _raid_bdev_io_type_supported checks whether io_type is supported in
970  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
971  * doesn't support, the raid device doesn't supports.
972  *
973  * params:
974  * raid_bdev - pointer to raid bdev context
975  * io_type - io type
976  * returns:
977  * true - io_type is supported
978  * false - io_type is not supported
979  */
980 inline static bool
981 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
982 {
983 	struct raid_base_bdev_info *base_info;
984 
985 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
986 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
987 		if (raid_bdev->module->submit_null_payload_request == NULL) {
988 			return false;
989 		}
990 	}
991 
992 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
993 		if (base_info->desc == NULL) {
994 			continue;
995 		}
996 
997 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
998 			return false;
999 		}
1000 	}
1001 
1002 	return true;
1003 }
1004 
1005 /*
1006  * brief:
1007  * raid_bdev_io_type_supported is the io_supported function for bdev function
1008  * table which returns whether the particular io type is supported or not by
1009  * raid bdev module
1010  * params:
1011  * ctx - pointer to raid bdev context
1012  * type - io type
1013  * returns:
1014  * true - io_type is supported
1015  * false - io_type is not supported
1016  */
1017 static bool
1018 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1019 {
1020 	switch (io_type) {
1021 	case SPDK_BDEV_IO_TYPE_READ:
1022 	case SPDK_BDEV_IO_TYPE_WRITE:
1023 		return true;
1024 
1025 	case SPDK_BDEV_IO_TYPE_FLUSH:
1026 	case SPDK_BDEV_IO_TYPE_RESET:
1027 	case SPDK_BDEV_IO_TYPE_UNMAP:
1028 		return _raid_bdev_io_type_supported(ctx, io_type);
1029 
1030 	default:
1031 		return false;
1032 	}
1033 
1034 	return false;
1035 }
1036 
1037 /*
1038  * brief:
1039  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1040  * raid bdev. This is used to return the io channel for this raid bdev
1041  * params:
1042  * ctxt - pointer to raid_bdev
1043  * returns:
1044  * pointer to io channel for raid bdev
1045  */
1046 static struct spdk_io_channel *
1047 raid_bdev_get_io_channel(void *ctxt)
1048 {
1049 	struct raid_bdev *raid_bdev = ctxt;
1050 
1051 	return spdk_get_io_channel(raid_bdev);
1052 }
1053 
1054 void
1055 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1056 {
1057 	struct raid_base_bdev_info *base_info;
1058 
1059 	assert(raid_bdev != NULL);
1060 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1061 
1062 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1063 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1064 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1065 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1066 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1067 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1068 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1069 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1070 				     raid_bdev->num_base_bdevs_operational);
1071 	if (raid_bdev->process) {
1072 		struct raid_bdev_process *process = raid_bdev->process;
1073 		uint64_t offset = process->window_offset;
1074 
1075 		spdk_json_write_named_object_begin(w, "process");
1076 		spdk_json_write_name(w, "type");
1077 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1078 		spdk_json_write_named_string(w, "target", process->target->name);
1079 		spdk_json_write_named_object_begin(w, "progress");
1080 		spdk_json_write_named_uint64(w, "blocks", offset);
1081 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1082 		spdk_json_write_object_end(w);
1083 		spdk_json_write_object_end(w);
1084 	}
1085 	spdk_json_write_name(w, "base_bdevs_list");
1086 	spdk_json_write_array_begin(w);
1087 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1088 		spdk_json_write_object_begin(w);
1089 		spdk_json_write_name(w, "name");
1090 		if (base_info->name) {
1091 			spdk_json_write_string(w, base_info->name);
1092 		} else {
1093 			spdk_json_write_null(w);
1094 		}
1095 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1096 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1097 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1098 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1099 		spdk_json_write_object_end(w);
1100 	}
1101 	spdk_json_write_array_end(w);
1102 }
1103 
1104 /*
1105  * brief:
1106  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1107  * params:
1108  * ctx - pointer to raid_bdev
1109  * w - pointer to json context
1110  * returns:
1111  * 0 - success
1112  * non zero - failure
1113  */
1114 static int
1115 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1116 {
1117 	struct raid_bdev *raid_bdev = ctx;
1118 
1119 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1120 
1121 	/* Dump the raid bdev configuration related information */
1122 	spdk_json_write_named_object_begin(w, "raid");
1123 	raid_bdev_write_info_json(raid_bdev, w);
1124 	spdk_json_write_object_end(w);
1125 
1126 	return 0;
1127 }
1128 
1129 /*
1130  * brief:
1131  * raid_bdev_write_config_json is the function table pointer for raid bdev
1132  * params:
1133  * bdev - pointer to spdk_bdev
1134  * w - pointer to json context
1135  * returns:
1136  * none
1137  */
1138 static void
1139 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1140 {
1141 	struct raid_bdev *raid_bdev = bdev->ctxt;
1142 	struct raid_base_bdev_info *base_info;
1143 
1144 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1145 
1146 	if (raid_bdev->superblock_enabled) {
1147 		/* raid bdev configuration is stored in the superblock */
1148 		return;
1149 	}
1150 
1151 	spdk_json_write_object_begin(w);
1152 
1153 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1154 
1155 	spdk_json_write_named_object_begin(w, "params");
1156 	spdk_json_write_named_string(w, "name", bdev->name);
1157 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1158 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1159 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1160 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1161 
1162 	spdk_json_write_named_array_begin(w, "base_bdevs");
1163 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1164 		if (base_info->desc) {
1165 			spdk_json_write_string(w, spdk_bdev_desc_get_bdev(base_info->desc)->name);
1166 		}
1167 	}
1168 	spdk_json_write_array_end(w);
1169 	spdk_json_write_object_end(w);
1170 
1171 	spdk_json_write_object_end(w);
1172 }
1173 
1174 static int
1175 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1176 {
1177 	struct raid_bdev *raid_bdev = ctx;
1178 	struct raid_base_bdev_info *base_info;
1179 	int domains_count = 0, rc = 0;
1180 
1181 	if (raid_bdev->module->memory_domains_supported == false) {
1182 		return 0;
1183 	}
1184 
1185 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1186 
1187 	/* First loop to get the number of memory domains */
1188 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1189 		if (base_info->desc == NULL) {
1190 			continue;
1191 		}
1192 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1193 		if (rc < 0) {
1194 			goto out;
1195 		}
1196 		domains_count += rc;
1197 	}
1198 
1199 	if (!domains || array_size < domains_count) {
1200 		goto out;
1201 	}
1202 
1203 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1204 		if (base_info->desc == NULL) {
1205 			continue;
1206 		}
1207 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1208 		if (rc < 0) {
1209 			goto out;
1210 		}
1211 		domains += rc;
1212 		array_size -= rc;
1213 	}
1214 out:
1215 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1216 
1217 	if (rc < 0) {
1218 		return rc;
1219 	}
1220 
1221 	return domains_count;
1222 }
1223 
1224 /* g_raid_bdev_fn_table is the function table for raid bdev */
1225 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1226 	.destruct		= raid_bdev_destruct,
1227 	.submit_request		= raid_bdev_submit_request,
1228 	.io_type_supported	= raid_bdev_io_type_supported,
1229 	.get_io_channel		= raid_bdev_get_io_channel,
1230 	.dump_info_json		= raid_bdev_dump_info_json,
1231 	.write_config_json	= raid_bdev_write_config_json,
1232 	.get_memory_domains	= raid_bdev_get_memory_domains,
1233 };
1234 
1235 struct raid_bdev *
1236 raid_bdev_find_by_name(const char *name)
1237 {
1238 	struct raid_bdev *raid_bdev;
1239 
1240 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1241 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1242 			return raid_bdev;
1243 		}
1244 	}
1245 
1246 	return NULL;
1247 }
1248 
1249 static struct {
1250 	const char *name;
1251 	enum raid_level value;
1252 } g_raid_level_names[] = {
1253 	{ "raid0", RAID0 },
1254 	{ "0", RAID0 },
1255 	{ "raid1", RAID1 },
1256 	{ "1", RAID1 },
1257 	{ "raid5f", RAID5F },
1258 	{ "5f", RAID5F },
1259 	{ "concat", CONCAT },
1260 	{ }
1261 };
1262 
1263 const char *g_raid_state_names[] = {
1264 	[RAID_BDEV_STATE_ONLINE]	= "online",
1265 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1266 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1267 	[RAID_BDEV_STATE_MAX]		= NULL
1268 };
1269 
1270 static const char *g_raid_process_type_names[] = {
1271 	[RAID_PROCESS_NONE]	= "none",
1272 	[RAID_PROCESS_REBUILD]	= "rebuild",
1273 	[RAID_PROCESS_MAX]	= NULL
1274 };
1275 
1276 /* We have to use the typedef in the function declaration to appease astyle. */
1277 typedef enum raid_level raid_level_t;
1278 typedef enum raid_bdev_state raid_bdev_state_t;
1279 
1280 raid_level_t
1281 raid_bdev_str_to_level(const char *str)
1282 {
1283 	unsigned int i;
1284 
1285 	assert(str != NULL);
1286 
1287 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1288 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1289 			return g_raid_level_names[i].value;
1290 		}
1291 	}
1292 
1293 	return INVALID_RAID_LEVEL;
1294 }
1295 
1296 const char *
1297 raid_bdev_level_to_str(enum raid_level level)
1298 {
1299 	unsigned int i;
1300 
1301 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1302 		if (g_raid_level_names[i].value == level) {
1303 			return g_raid_level_names[i].name;
1304 		}
1305 	}
1306 
1307 	return "";
1308 }
1309 
1310 raid_bdev_state_t
1311 raid_bdev_str_to_state(const char *str)
1312 {
1313 	unsigned int i;
1314 
1315 	assert(str != NULL);
1316 
1317 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1318 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1319 			break;
1320 		}
1321 	}
1322 
1323 	return i;
1324 }
1325 
1326 const char *
1327 raid_bdev_state_to_str(enum raid_bdev_state state)
1328 {
1329 	if (state >= RAID_BDEV_STATE_MAX) {
1330 		return "";
1331 	}
1332 
1333 	return g_raid_state_names[state];
1334 }
1335 
1336 const char *
1337 raid_bdev_process_to_str(enum raid_process_type value)
1338 {
1339 	if (value >= RAID_PROCESS_MAX) {
1340 		return "";
1341 	}
1342 
1343 	return g_raid_process_type_names[value];
1344 }
1345 
1346 /*
1347  * brief:
1348  * raid_bdev_fini_start is called when bdev layer is starting the
1349  * shutdown process
1350  * params:
1351  * none
1352  * returns:
1353  * none
1354  */
1355 static void
1356 raid_bdev_fini_start(void)
1357 {
1358 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1359 	g_shutdown_started = true;
1360 }
1361 
1362 /*
1363  * brief:
1364  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1365  * params:
1366  * none
1367  * returns:
1368  * none
1369  */
1370 static void
1371 raid_bdev_exit(void)
1372 {
1373 	struct raid_bdev *raid_bdev, *tmp;
1374 
1375 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1376 
1377 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1378 		raid_bdev_cleanup_and_free(raid_bdev);
1379 	}
1380 }
1381 
1382 static void
1383 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1384 {
1385 	spdk_json_write_object_begin(w);
1386 
1387 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1388 
1389 	spdk_json_write_named_object_begin(w, "params");
1390 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1391 	spdk_json_write_object_end(w);
1392 
1393 	spdk_json_write_object_end(w);
1394 }
1395 
1396 static int
1397 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1398 {
1399 	raid_bdev_opts_config_json(w);
1400 
1401 	return 0;
1402 }
1403 
1404 /*
1405  * brief:
1406  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1407  * module
1408  * params:
1409  * none
1410  * returns:
1411  * size of spdk_bdev_io context for raid
1412  */
1413 static int
1414 raid_bdev_get_ctx_size(void)
1415 {
1416 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1417 	return sizeof(struct raid_bdev_io);
1418 }
1419 
1420 static struct spdk_bdev_module g_raid_if = {
1421 	.name = "raid",
1422 	.module_init = raid_bdev_init,
1423 	.fini_start = raid_bdev_fini_start,
1424 	.module_fini = raid_bdev_exit,
1425 	.config_json = raid_bdev_config_json,
1426 	.get_ctx_size = raid_bdev_get_ctx_size,
1427 	.examine_disk = raid_bdev_examine,
1428 	.async_init = false,
1429 	.async_fini = false,
1430 };
1431 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1432 
1433 /*
1434  * brief:
1435  * raid_bdev_init is the initialization function for raid bdev module
1436  * params:
1437  * none
1438  * returns:
1439  * 0 - success
1440  * non zero - failure
1441  */
1442 static int
1443 raid_bdev_init(void)
1444 {
1445 	return 0;
1446 }
1447 
1448 static int
1449 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1450 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1451 		  struct raid_bdev **raid_bdev_out)
1452 {
1453 	struct raid_bdev *raid_bdev;
1454 	struct spdk_bdev *raid_bdev_gen;
1455 	struct raid_bdev_module *module;
1456 	struct raid_base_bdev_info *base_info;
1457 	uint8_t min_operational;
1458 
1459 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1460 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1461 		return -EINVAL;
1462 	}
1463 
1464 	if (raid_bdev_find_by_name(name) != NULL) {
1465 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1466 		return -EEXIST;
1467 	}
1468 
1469 	if (level == RAID1) {
1470 		if (strip_size != 0) {
1471 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1472 			return -EINVAL;
1473 		}
1474 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1475 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1476 		return -EINVAL;
1477 	}
1478 
1479 	module = raid_bdev_module_find(level);
1480 	if (module == NULL) {
1481 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1482 		return -EINVAL;
1483 	}
1484 
1485 	assert(module->base_bdevs_min != 0);
1486 	if (num_base_bdevs < module->base_bdevs_min) {
1487 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1488 			    module->base_bdevs_min,
1489 			    raid_bdev_level_to_str(level));
1490 		return -EINVAL;
1491 	}
1492 
1493 	switch (module->base_bdevs_constraint.type) {
1494 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1495 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1496 		break;
1497 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1498 		min_operational = module->base_bdevs_constraint.value;
1499 		break;
1500 	case CONSTRAINT_UNSET:
1501 		if (module->base_bdevs_constraint.value != 0) {
1502 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1503 				    (uint8_t)module->base_bdevs_constraint.value, name);
1504 			return -EINVAL;
1505 		}
1506 		min_operational = num_base_bdevs;
1507 		break;
1508 	default:
1509 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1510 			    (uint8_t)module->base_bdevs_constraint.type,
1511 			    raid_bdev_level_to_str(module->level));
1512 		return -EINVAL;
1513 	};
1514 
1515 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1516 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1517 			    raid_bdev_level_to_str(module->level));
1518 		return -EINVAL;
1519 	}
1520 
1521 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1522 	if (!raid_bdev) {
1523 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1524 		return -ENOMEM;
1525 	}
1526 
1527 	spdk_spin_init(&raid_bdev->base_bdev_lock);
1528 	raid_bdev->module = module;
1529 	raid_bdev->num_base_bdevs = num_base_bdevs;
1530 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1531 					   sizeof(struct raid_base_bdev_info));
1532 	if (!raid_bdev->base_bdev_info) {
1533 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1534 		raid_bdev_free(raid_bdev);
1535 		return -ENOMEM;
1536 	}
1537 
1538 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1539 		base_info->raid_bdev = raid_bdev;
1540 	}
1541 
1542 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1543 	 * internally and set later.
1544 	 */
1545 	raid_bdev->strip_size = 0;
1546 	raid_bdev->strip_size_kb = strip_size;
1547 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1548 	raid_bdev->level = level;
1549 	raid_bdev->min_base_bdevs_operational = min_operational;
1550 	raid_bdev->superblock_enabled = superblock_enabled;
1551 
1552 	raid_bdev_gen = &raid_bdev->bdev;
1553 
1554 	raid_bdev_gen->name = strdup(name);
1555 	if (!raid_bdev_gen->name) {
1556 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1557 		raid_bdev_free(raid_bdev);
1558 		return -ENOMEM;
1559 	}
1560 
1561 	raid_bdev_gen->product_name = "Raid Volume";
1562 	raid_bdev_gen->ctxt = raid_bdev;
1563 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1564 	raid_bdev_gen->module = &g_raid_if;
1565 	raid_bdev_gen->write_cache = 0;
1566 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1567 
1568 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1569 
1570 	*raid_bdev_out = raid_bdev;
1571 
1572 	return 0;
1573 }
1574 
1575 /*
1576  * brief:
1577  * raid_bdev_create allocates raid bdev based on passed configuration
1578  * params:
1579  * name - name for raid bdev
1580  * strip_size - strip size in KB
1581  * num_base_bdevs - number of base bdevs
1582  * level - raid level
1583  * superblock_enabled - true if raid should have superblock
1584  * uuid - uuid to set for the bdev
1585  * raid_bdev_out - the created raid bdev
1586  * returns:
1587  * 0 - success
1588  * non zero - failure
1589  */
1590 int
1591 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1592 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1593 		 struct raid_bdev **raid_bdev_out)
1594 {
1595 	struct raid_bdev *raid_bdev;
1596 	int rc;
1597 
1598 	assert(uuid != NULL);
1599 
1600 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1601 			       &raid_bdev);
1602 	if (rc != 0) {
1603 		return rc;
1604 	}
1605 
1606 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1607 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1608 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1609 	}
1610 
1611 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1612 
1613 	*raid_bdev_out = raid_bdev;
1614 
1615 	return 0;
1616 }
1617 
1618 static void
1619 _raid_bdev_unregistering_cont(void *ctx)
1620 {
1621 	struct raid_bdev *raid_bdev = ctx;
1622 
1623 	spdk_bdev_close(raid_bdev->self_desc);
1624 	raid_bdev->self_desc = NULL;
1625 }
1626 
1627 static void
1628 raid_bdev_unregistering_cont(void *ctx)
1629 {
1630 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1631 }
1632 
1633 static int
1634 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1635 {
1636 	struct raid_process_finish_action *finish_action;
1637 
1638 	assert(spdk_get_thread() == process->thread);
1639 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1640 
1641 	finish_action = calloc(1, sizeof(*finish_action));
1642 	if (finish_action == NULL) {
1643 		return -ENOMEM;
1644 	}
1645 
1646 	finish_action->cb = cb;
1647 	finish_action->cb_ctx = cb_ctx;
1648 
1649 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1650 
1651 	return 0;
1652 }
1653 
1654 static void
1655 raid_bdev_unregistering_stop_process(void *ctx)
1656 {
1657 	struct raid_bdev_process *process = ctx;
1658 	struct raid_bdev *raid_bdev = process->raid_bdev;
1659 	int rc;
1660 
1661 	process->state = RAID_PROCESS_STATE_STOPPING;
1662 	if (process->status == 0) {
1663 		process->status = -ECANCELED;
1664 	}
1665 
1666 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1667 	if (rc != 0) {
1668 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1669 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1670 	}
1671 }
1672 
1673 static void
1674 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1675 {
1676 	struct raid_bdev *raid_bdev = event_ctx;
1677 
1678 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1679 		if (raid_bdev->process != NULL) {
1680 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1681 					     raid_bdev->process);
1682 		} else {
1683 			raid_bdev_unregistering_cont(raid_bdev);
1684 		}
1685 	}
1686 }
1687 
1688 static void
1689 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1690 {
1691 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1692 	int rc;
1693 
1694 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1695 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1696 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1697 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1698 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1699 				sizeof(struct raid_bdev_io_channel),
1700 				raid_bdev_gen->name);
1701 	rc = spdk_bdev_register(raid_bdev_gen);
1702 	if (rc != 0) {
1703 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1704 			    raid_bdev_gen->name, spdk_strerror(-rc));
1705 		goto err;
1706 	}
1707 
1708 	/*
1709 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1710 	 * first. The process may still need to unquiesce a range but it will fail because the
1711 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1712 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1713 	 * so this is the only way currently to do this correctly.
1714 	 * TODO: try to handle this correctly in bdev layer instead.
1715 	 */
1716 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1717 				&raid_bdev->self_desc);
1718 	if (rc != 0) {
1719 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1720 			    raid_bdev_gen->name, spdk_strerror(-rc));
1721 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1722 		goto err;
1723 	}
1724 
1725 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1726 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1727 		      raid_bdev_gen->name, raid_bdev);
1728 	return;
1729 err:
1730 	if (raid_bdev->module->stop != NULL) {
1731 		raid_bdev->module->stop(raid_bdev);
1732 	}
1733 	spdk_io_device_unregister(raid_bdev, NULL);
1734 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1735 }
1736 
1737 static void
1738 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1739 {
1740 	if (status == 0) {
1741 		raid_bdev_configure_cont(raid_bdev);
1742 	} else {
1743 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1744 			    raid_bdev->bdev.name, spdk_strerror(-status));
1745 		if (raid_bdev->module->stop != NULL) {
1746 			raid_bdev->module->stop(raid_bdev);
1747 		}
1748 	}
1749 }
1750 
1751 /*
1752  * brief:
1753  * If raid bdev config is complete, then only register the raid bdev to
1754  * bdev layer and remove this raid bdev from configuring list and
1755  * insert the raid bdev to configured list
1756  * params:
1757  * raid_bdev - pointer to raid bdev
1758  * returns:
1759  * 0 - success
1760  * non zero - failure
1761  */
1762 static int
1763 raid_bdev_configure(struct raid_bdev *raid_bdev)
1764 {
1765 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1766 	int rc;
1767 
1768 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1769 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1770 	assert(raid_bdev->bdev.blocklen > 0);
1771 
1772 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1773 	 * internal use.
1774 	 */
1775 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1776 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1777 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1778 		return -EINVAL;
1779 	}
1780 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1781 	raid_bdev->blocklen_shift = spdk_u32log2(data_block_size);
1782 
1783 	rc = raid_bdev->module->start(raid_bdev);
1784 	if (rc != 0) {
1785 		SPDK_ERRLOG("raid module startup callback failed\n");
1786 		return rc;
1787 	}
1788 
1789 	if (raid_bdev->superblock_enabled) {
1790 		if (raid_bdev->sb == NULL) {
1791 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1792 			if (rc == 0) {
1793 				raid_bdev_init_superblock(raid_bdev);
1794 			}
1795 		} else {
1796 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1797 			if (raid_bdev->sb->block_size != data_block_size) {
1798 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1799 				rc = -EINVAL;
1800 			}
1801 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1802 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1803 				rc = -EINVAL;
1804 			}
1805 		}
1806 
1807 		if (rc != 0) {
1808 			if (raid_bdev->module->stop != NULL) {
1809 				raid_bdev->module->stop(raid_bdev);
1810 			}
1811 			return rc;
1812 		}
1813 
1814 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1815 	} else {
1816 		raid_bdev_configure_cont(raid_bdev);
1817 	}
1818 
1819 	return 0;
1820 }
1821 
1822 /*
1823  * brief:
1824  * If raid bdev is online and registered, change the bdev state to
1825  * configuring and unregister this raid device. Queue this raid device
1826  * in configuring list
1827  * params:
1828  * raid_bdev - pointer to raid bdev
1829  * cb_fn - callback function
1830  * cb_arg - argument to callback function
1831  * returns:
1832  * none
1833  */
1834 static void
1835 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1836 		      void *cb_arg)
1837 {
1838 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1839 		if (cb_fn) {
1840 			cb_fn(cb_arg, 0);
1841 		}
1842 		return;
1843 	}
1844 
1845 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1846 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1847 
1848 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1849 }
1850 
1851 /*
1852  * brief:
1853  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1854  * params:
1855  * base_bdev - pointer to base bdev
1856  * returns:
1857  * base bdev info if found, otherwise NULL.
1858  */
1859 static struct raid_base_bdev_info *
1860 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1861 {
1862 	struct raid_bdev *raid_bdev;
1863 	struct raid_base_bdev_info *base_info;
1864 
1865 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1866 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1867 			if (base_info->desc != NULL &&
1868 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1869 				return base_info;
1870 			}
1871 		}
1872 	}
1873 
1874 	return NULL;
1875 }
1876 
1877 static void
1878 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1879 {
1880 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1881 
1882 	assert(base_info->remove_scheduled);
1883 	base_info->remove_scheduled = false;
1884 
1885 	if (status == 0) {
1886 		raid_bdev->num_base_bdevs_operational--;
1887 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1888 			/* There is not enough base bdevs to keep the raid bdev operational. */
1889 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1890 			return;
1891 		}
1892 	}
1893 
1894 	if (base_info->remove_cb != NULL) {
1895 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1896 	}
1897 }
1898 
1899 static void
1900 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1901 {
1902 	struct raid_base_bdev_info *base_info = ctx;
1903 
1904 	if (status != 0) {
1905 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1906 			    raid_bdev->bdev.name, spdk_strerror(-status));
1907 	}
1908 
1909 	raid_bdev_remove_base_bdev_done(base_info, status);
1910 }
1911 
1912 static void
1913 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1914 {
1915 	struct raid_base_bdev_info *base_info = ctx;
1916 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1917 
1918 	if (status != 0) {
1919 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1920 			    raid_bdev->bdev.name, spdk_strerror(-status));
1921 		goto out;
1922 	}
1923 
1924 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1925 	raid_bdev_free_base_bdev_resource(base_info);
1926 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1927 
1928 	if (raid_bdev->sb) {
1929 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1930 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1931 		uint8_t i;
1932 
1933 		for (i = 0; i < sb->base_bdevs_size; i++) {
1934 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1935 
1936 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1937 			    sb_base_bdev->slot == slot) {
1938 				/* TODO: distinguish between failure and intentional removal */
1939 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1940 
1941 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1942 				return;
1943 			}
1944 		}
1945 	}
1946 out:
1947 	raid_bdev_remove_base_bdev_done(base_info, status);
1948 }
1949 
1950 static void
1951 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1952 {
1953 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1954 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1955 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1956 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1957 
1958 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1959 
1960 	if (raid_ch->base_channel[idx] != NULL) {
1961 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1962 		raid_ch->base_channel[idx] = NULL;
1963 	}
1964 
1965 	if (raid_ch->process.ch_processed != NULL) {
1966 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1967 	}
1968 
1969 	spdk_for_each_channel_continue(i, 0);
1970 }
1971 
1972 static void
1973 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1974 {
1975 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1976 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1977 
1978 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1979 			    base_info);
1980 }
1981 
1982 static void
1983 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1984 {
1985 	struct raid_base_bdev_info *base_info = ctx;
1986 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1987 
1988 	if (status != 0) {
1989 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1990 			    raid_bdev->bdev.name, spdk_strerror(-status));
1991 		raid_bdev_remove_base_bdev_done(base_info, status);
1992 		return;
1993 	}
1994 
1995 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1996 			      raid_bdev_channels_remove_base_bdev_done);
1997 }
1998 
1999 static int
2000 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2001 {
2002 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2003 
2004 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2005 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2006 }
2007 
2008 struct raid_bdev_process_base_bdev_remove_ctx {
2009 	struct raid_bdev_process *process;
2010 	struct raid_base_bdev_info *base_info;
2011 	uint8_t num_base_bdevs_operational;
2012 };
2013 
2014 static void
2015 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2016 {
2017 	struct raid_base_bdev_info *base_info = ctx;
2018 	int ret;
2019 
2020 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2021 	if (ret != 0) {
2022 		raid_bdev_remove_base_bdev_done(base_info, ret);
2023 	}
2024 }
2025 
2026 static void
2027 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2028 {
2029 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2030 	struct raid_base_bdev_info *base_info = ctx->base_info;
2031 
2032 	free(ctx);
2033 
2034 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2035 			     base_info);
2036 }
2037 
2038 static void
2039 _raid_bdev_process_base_bdev_remove(void *_ctx)
2040 {
2041 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2042 	struct raid_bdev_process *process = ctx->process;
2043 	int ret;
2044 
2045 	if (ctx->base_info != process->target &&
2046 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2047 		/* process doesn't need to be stopped */
2048 		raid_bdev_process_base_bdev_remove_cont(ctx);
2049 		return;
2050 	}
2051 
2052 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2053 	       process->state < RAID_PROCESS_STATE_STOPPED);
2054 
2055 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2056 	if (ret != 0) {
2057 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2058 		free(ctx);
2059 		return;
2060 	}
2061 
2062 	process->state = RAID_PROCESS_STATE_STOPPING;
2063 
2064 	if (process->status == 0) {
2065 		process->status = -ENODEV;
2066 	}
2067 }
2068 
2069 static int
2070 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2071 				   struct raid_base_bdev_info *base_info)
2072 {
2073 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2074 
2075 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2076 
2077 	ctx = calloc(1, sizeof(*ctx));
2078 	if (ctx == NULL) {
2079 		return -ENOMEM;
2080 	}
2081 
2082 	/*
2083 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2084 	 * because the process thread should not access raid_bdev's properties. Particularly,
2085 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2086 	 * will still be valid until the process is fully stopped.
2087 	 */
2088 	ctx->base_info = base_info;
2089 	ctx->process = process;
2090 	/*
2091 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2092 	 * after the removal and more than one base bdev may be removed at the same time
2093 	 */
2094 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2095 		if (!base_info->remove_scheduled && base_info->desc != NULL) {
2096 			ctx->num_base_bdevs_operational++;
2097 		}
2098 	}
2099 
2100 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2101 
2102 	return 0;
2103 }
2104 
2105 static int
2106 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2107 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2108 {
2109 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2110 	int ret = 0;
2111 
2112 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2113 
2114 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2115 
2116 	if (base_info->remove_scheduled) {
2117 		return -ENODEV;
2118 	}
2119 
2120 	assert(base_info->desc);
2121 	base_info->remove_scheduled = true;
2122 
2123 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2124 		/*
2125 		 * As raid bdev is not registered yet or already unregistered,
2126 		 * so cleanup should be done here itself.
2127 		 *
2128 		 * Removing a base bdev at this stage does not change the number of operational
2129 		 * base bdevs, only the number of discovered base bdevs.
2130 		 */
2131 		raid_bdev_free_base_bdev_resource(base_info);
2132 		base_info->remove_scheduled = false;
2133 		if (raid_bdev->num_base_bdevs_discovered == 0) {
2134 			/* There is no base bdev for this raid, so free the raid device. */
2135 			raid_bdev_cleanup_and_free(raid_bdev);
2136 		}
2137 		if (cb_fn != NULL) {
2138 			cb_fn(cb_ctx, 0);
2139 		}
2140 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2141 		/* This raid bdev does not tolerate removing a base bdev. */
2142 		raid_bdev->num_base_bdevs_operational--;
2143 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2144 	} else {
2145 		base_info->remove_cb = cb_fn;
2146 		base_info->remove_cb_ctx = cb_ctx;
2147 
2148 		if (raid_bdev->process != NULL) {
2149 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2150 		} else {
2151 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2152 		}
2153 
2154 		if (ret != 0) {
2155 			base_info->remove_scheduled = false;
2156 		}
2157 	}
2158 
2159 	return ret;
2160 }
2161 
2162 /*
2163  * brief:
2164  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2165  * is removed. This function checks if this base bdev is part of any raid bdev
2166  * or not. If yes, it takes necessary action on that particular raid bdev.
2167  * params:
2168  * base_bdev - pointer to base bdev which got removed
2169  * cb_fn - callback function
2170  * cb_arg - argument to callback function
2171  * returns:
2172  * 0 - success
2173  * non zero - failure
2174  */
2175 int
2176 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2177 {
2178 	struct raid_base_bdev_info *base_info;
2179 
2180 	/* Find the raid_bdev which has claimed this base_bdev */
2181 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2182 	if (!base_info) {
2183 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2184 		return -ENODEV;
2185 	}
2186 
2187 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2188 }
2189 
2190 static void
2191 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2192 {
2193 	if (status != 0) {
2194 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2195 			    raid_bdev->bdev.name, spdk_strerror(-status));
2196 	}
2197 }
2198 
2199 /*
2200  * brief:
2201  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2202  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2203  * If yes, call module handler to resize the raid_bdev if implemented.
2204  * params:
2205  * base_bdev - pointer to base bdev which got resized.
2206  * returns:
2207  * none
2208  */
2209 static void
2210 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2211 {
2212 	struct raid_bdev *raid_bdev;
2213 	struct raid_base_bdev_info *base_info;
2214 	uint64_t blockcnt_old;
2215 
2216 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2217 
2218 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2219 
2220 	/* Find the raid_bdev which has claimed this base_bdev */
2221 	if (!base_info) {
2222 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2223 		return;
2224 	}
2225 	raid_bdev = base_info->raid_bdev;
2226 
2227 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2228 
2229 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2230 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2231 
2232 	base_info->blockcnt = base_bdev->blockcnt;
2233 
2234 	if (!raid_bdev->module->resize) {
2235 		return;
2236 	}
2237 
2238 	blockcnt_old = raid_bdev->bdev.blockcnt;
2239 	if (raid_bdev->module->resize(raid_bdev) == false) {
2240 		return;
2241 	}
2242 
2243 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2244 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2245 
2246 	if (raid_bdev->superblock_enabled) {
2247 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2248 		uint8_t i;
2249 
2250 		for (i = 0; i < sb->base_bdevs_size; i++) {
2251 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2252 
2253 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2254 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2255 				sb_base_bdev->data_size = base_info->data_size;
2256 			}
2257 		}
2258 		sb->raid_size = raid_bdev->bdev.blockcnt;
2259 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2260 	}
2261 }
2262 
2263 /*
2264  * brief:
2265  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2266  * triggers asynchronous event.
2267  * params:
2268  * type - event details.
2269  * bdev - bdev that triggered event.
2270  * event_ctx - context for event.
2271  * returns:
2272  * none
2273  */
2274 static void
2275 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2276 			  void *event_ctx)
2277 {
2278 	int rc;
2279 
2280 	switch (type) {
2281 	case SPDK_BDEV_EVENT_REMOVE:
2282 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2283 		if (rc != 0) {
2284 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2285 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2286 		}
2287 		break;
2288 	case SPDK_BDEV_EVENT_RESIZE:
2289 		raid_bdev_resize_base_bdev(bdev);
2290 		break;
2291 	default:
2292 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2293 		break;
2294 	}
2295 }
2296 
2297 /*
2298  * brief:
2299  * Deletes the specified raid bdev
2300  * params:
2301  * raid_bdev - pointer to raid bdev
2302  * cb_fn - callback function
2303  * cb_arg - argument to callback function
2304  */
2305 void
2306 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2307 {
2308 	struct raid_base_bdev_info *base_info;
2309 
2310 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2311 
2312 	if (raid_bdev->destroy_started) {
2313 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2314 			      raid_bdev->bdev.name);
2315 		if (cb_fn) {
2316 			cb_fn(cb_arg, -EALREADY);
2317 		}
2318 		return;
2319 	}
2320 
2321 	raid_bdev->destroy_started = true;
2322 
2323 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2324 		base_info->remove_scheduled = true;
2325 
2326 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2327 			/*
2328 			 * As raid bdev is not registered yet or already unregistered,
2329 			 * so cleanup should be done here itself.
2330 			 */
2331 			raid_bdev_free_base_bdev_resource(base_info);
2332 		}
2333 	}
2334 
2335 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2336 		/* There is no base bdev for this raid, so free the raid device. */
2337 		raid_bdev_cleanup_and_free(raid_bdev);
2338 		if (cb_fn) {
2339 			cb_fn(cb_arg, 0);
2340 		}
2341 	} else {
2342 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2343 	}
2344 }
2345 
2346 static void
2347 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2348 {
2349 	if (status != 0) {
2350 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2351 			    raid_bdev->bdev.name, spdk_strerror(-status));
2352 	}
2353 }
2354 
2355 static void
2356 raid_bdev_process_finish_write_sb(void *ctx)
2357 {
2358 	struct raid_bdev *raid_bdev = ctx;
2359 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2360 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2361 	struct raid_base_bdev_info *base_info;
2362 	uint8_t i;
2363 
2364 	for (i = 0; i < sb->base_bdevs_size; i++) {
2365 		sb_base_bdev = &sb->base_bdevs[i];
2366 
2367 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2368 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2369 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2370 			if (base_info->is_configured) {
2371 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2372 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2373 			}
2374 		}
2375 	}
2376 
2377 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2378 }
2379 
2380 static void raid_bdev_process_free(struct raid_bdev_process *process);
2381 
2382 static void
2383 _raid_bdev_process_finish_done(void *ctx)
2384 {
2385 	struct raid_bdev_process *process = ctx;
2386 	struct raid_process_finish_action *finish_action;
2387 
2388 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2389 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2390 		finish_action->cb(finish_action->cb_ctx);
2391 		free(finish_action);
2392 	}
2393 
2394 	raid_bdev_process_free(process);
2395 
2396 	spdk_thread_exit(spdk_get_thread());
2397 }
2398 
2399 static void
2400 raid_bdev_process_finish_target_removed(void *ctx, int status)
2401 {
2402 	struct raid_bdev_process *process = ctx;
2403 
2404 	if (status != 0) {
2405 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2406 	}
2407 
2408 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2409 }
2410 
2411 static void
2412 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2413 {
2414 	struct raid_bdev_process *process = ctx;
2415 
2416 	if (status != 0) {
2417 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2418 	}
2419 
2420 	if (process->status != 0) {
2421 		struct raid_base_bdev_info *target = process->target;
2422 
2423 		if (target->desc != NULL && target->remove_scheduled == false) {
2424 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2425 			return;
2426 		}
2427 	}
2428 
2429 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2430 }
2431 
2432 static void
2433 raid_bdev_process_finish_unquiesce(void *ctx)
2434 {
2435 	struct raid_bdev_process *process = ctx;
2436 	int rc;
2437 
2438 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2439 				 raid_bdev_process_finish_unquiesced, process);
2440 	if (rc != 0) {
2441 		raid_bdev_process_finish_unquiesced(process, rc);
2442 	}
2443 }
2444 
2445 static void
2446 raid_bdev_process_finish_done(void *ctx)
2447 {
2448 	struct raid_bdev_process *process = ctx;
2449 	struct raid_bdev *raid_bdev = process->raid_bdev;
2450 
2451 	if (process->raid_ch != NULL) {
2452 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2453 	}
2454 
2455 	process->state = RAID_PROCESS_STATE_STOPPED;
2456 
2457 	if (process->status == 0) {
2458 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2459 			       raid_bdev_process_to_str(process->type),
2460 			       raid_bdev->bdev.name);
2461 		if (raid_bdev->superblock_enabled) {
2462 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2463 					     raid_bdev_process_finish_write_sb,
2464 					     raid_bdev);
2465 		}
2466 	} else {
2467 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2468 			     raid_bdev_process_to_str(process->type),
2469 			     raid_bdev->bdev.name,
2470 			     spdk_strerror(-process->status));
2471 	}
2472 
2473 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2474 			     process);
2475 }
2476 
2477 static void
2478 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2479 {
2480 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2481 
2482 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2483 }
2484 
2485 static void
2486 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2487 {
2488 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2489 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2490 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2491 
2492 	if (process->status == 0) {
2493 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2494 
2495 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2496 		raid_ch->process.target_ch = NULL;
2497 	}
2498 
2499 	raid_bdev_ch_process_cleanup(raid_ch);
2500 
2501 	spdk_for_each_channel_continue(i, 0);
2502 }
2503 
2504 static void
2505 raid_bdev_process_finish_quiesced(void *ctx, int status)
2506 {
2507 	struct raid_bdev_process *process = ctx;
2508 	struct raid_bdev *raid_bdev = process->raid_bdev;
2509 
2510 	if (status != 0) {
2511 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2512 		return;
2513 	}
2514 
2515 	raid_bdev->process = NULL;
2516 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2517 			      __raid_bdev_process_finish);
2518 }
2519 
2520 static void
2521 _raid_bdev_process_finish(void *ctx)
2522 {
2523 	struct raid_bdev_process *process = ctx;
2524 	int rc;
2525 
2526 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2527 			       raid_bdev_process_finish_quiesced, process);
2528 	if (rc != 0) {
2529 		raid_bdev_process_finish_quiesced(ctx, rc);
2530 	}
2531 }
2532 
2533 static void
2534 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2535 {
2536 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2537 }
2538 
2539 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2540 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2541 
2542 static void
2543 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2544 {
2545 	assert(spdk_get_thread() == process->thread);
2546 
2547 	if (process->status == 0) {
2548 		process->status = status;
2549 	}
2550 
2551 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2552 		return;
2553 	}
2554 
2555 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2556 	process->state = RAID_PROCESS_STATE_STOPPING;
2557 
2558 	if (process->window_range_locked) {
2559 		raid_bdev_process_unlock_window_range(process);
2560 	} else {
2561 		raid_bdev_process_thread_run(process);
2562 	}
2563 }
2564 
2565 static void
2566 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2567 {
2568 	struct raid_bdev_process *process = ctx;
2569 
2570 	if (status != 0) {
2571 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2572 		raid_bdev_process_finish(process, status);
2573 		return;
2574 	}
2575 
2576 	process->window_range_locked = false;
2577 	process->window_offset += process->window_size;
2578 
2579 	raid_bdev_process_thread_run(process);
2580 }
2581 
2582 static void
2583 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2584 {
2585 	int rc;
2586 
2587 	assert(process->window_range_locked == true);
2588 
2589 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2590 				       process->window_offset, process->max_window_size,
2591 				       raid_bdev_process_window_range_unlocked, process);
2592 	if (rc != 0) {
2593 		raid_bdev_process_window_range_unlocked(process, rc);
2594 	}
2595 }
2596 
2597 static void
2598 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2599 {
2600 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2601 
2602 	raid_bdev_process_unlock_window_range(process);
2603 }
2604 
2605 static void
2606 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2607 {
2608 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2609 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2610 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2611 
2612 	raid_ch->process.offset = process->window_offset + process->window_size;
2613 
2614 	spdk_for_each_channel_continue(i, 0);
2615 }
2616 
2617 void
2618 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2619 {
2620 	struct raid_bdev_process *process = process_req->process;
2621 
2622 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2623 
2624 	assert(spdk_get_thread() == process->thread);
2625 	assert(process->window_remaining >= process_req->num_blocks);
2626 
2627 	if (status != 0) {
2628 		process->window_status = status;
2629 	}
2630 
2631 	process->window_remaining -= process_req->num_blocks;
2632 	if (process->window_remaining == 0) {
2633 		if (process->window_status != 0) {
2634 			raid_bdev_process_finish(process, process->window_status);
2635 			return;
2636 		}
2637 
2638 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2639 				      raid_bdev_process_channels_update_done);
2640 	}
2641 }
2642 
2643 static int
2644 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2645 				 uint32_t num_blocks)
2646 {
2647 	struct raid_bdev *raid_bdev = process->raid_bdev;
2648 	struct raid_bdev_process_request *process_req;
2649 	int ret;
2650 
2651 	process_req = TAILQ_FIRST(&process->requests);
2652 	if (process_req == NULL) {
2653 		assert(process->window_remaining > 0);
2654 		return 0;
2655 	}
2656 
2657 	process_req->target = process->target;
2658 	process_req->target_ch = process->raid_ch->process.target_ch;
2659 	process_req->offset_blocks = offset_blocks;
2660 	process_req->num_blocks = num_blocks;
2661 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2662 
2663 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2664 	if (ret <= 0) {
2665 		if (ret < 0) {
2666 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2667 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2668 			process->window_status = ret;
2669 		}
2670 		return ret;
2671 	}
2672 
2673 	process_req->num_blocks = ret;
2674 	TAILQ_REMOVE(&process->requests, process_req, link);
2675 
2676 	return ret;
2677 }
2678 
2679 static void
2680 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2681 {
2682 	struct raid_bdev *raid_bdev = process->raid_bdev;
2683 	uint64_t offset = process->window_offset;
2684 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2685 	int ret;
2686 
2687 	while (offset < offset_end) {
2688 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2689 		if (ret <= 0) {
2690 			break;
2691 		}
2692 
2693 		process->window_remaining += ret;
2694 		offset += ret;
2695 	}
2696 
2697 	if (process->window_remaining > 0) {
2698 		process->window_size = process->window_remaining;
2699 	} else {
2700 		raid_bdev_process_finish(process, process->window_status);
2701 	}
2702 }
2703 
2704 static void
2705 raid_bdev_process_window_range_locked(void *ctx, int status)
2706 {
2707 	struct raid_bdev_process *process = ctx;
2708 
2709 	if (status != 0) {
2710 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2711 		raid_bdev_process_finish(process, status);
2712 		return;
2713 	}
2714 
2715 	process->window_range_locked = true;
2716 
2717 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2718 		raid_bdev_process_unlock_window_range(process);
2719 		return;
2720 	}
2721 
2722 	_raid_bdev_process_thread_run(process);
2723 }
2724 
2725 static void
2726 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2727 {
2728 	struct raid_bdev *raid_bdev = process->raid_bdev;
2729 	int rc;
2730 
2731 	assert(spdk_get_thread() == process->thread);
2732 	assert(process->window_remaining == 0);
2733 	assert(process->window_range_locked == false);
2734 
2735 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2736 		raid_bdev_process_do_finish(process);
2737 		return;
2738 	}
2739 
2740 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2741 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2742 		raid_bdev_process_finish(process, 0);
2743 		return;
2744 	}
2745 
2746 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2747 					    process->max_window_size);
2748 
2749 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2750 				     process->window_offset, process->max_window_size,
2751 				     raid_bdev_process_window_range_locked, process);
2752 	if (rc != 0) {
2753 		raid_bdev_process_window_range_locked(process, rc);
2754 	}
2755 }
2756 
2757 static void
2758 raid_bdev_process_thread_init(void *ctx)
2759 {
2760 	struct raid_bdev_process *process = ctx;
2761 	struct raid_bdev *raid_bdev = process->raid_bdev;
2762 	struct spdk_io_channel *ch;
2763 
2764 	process->thread = spdk_get_thread();
2765 
2766 	ch = spdk_get_io_channel(raid_bdev);
2767 	if (ch == NULL) {
2768 		process->status = -ENOMEM;
2769 		raid_bdev_process_do_finish(process);
2770 		return;
2771 	}
2772 
2773 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2774 	process->state = RAID_PROCESS_STATE_RUNNING;
2775 
2776 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2777 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2778 
2779 	raid_bdev_process_thread_run(process);
2780 }
2781 
2782 static void
2783 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2784 {
2785 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2786 
2787 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2788 	raid_bdev_process_free(process);
2789 
2790 	/* TODO: update sb */
2791 }
2792 
2793 static void
2794 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2795 {
2796 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2797 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2798 
2799 	raid_bdev_ch_process_cleanup(raid_ch);
2800 
2801 	spdk_for_each_channel_continue(i, 0);
2802 }
2803 
2804 static void
2805 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2806 {
2807 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2808 	struct raid_bdev *raid_bdev = process->raid_bdev;
2809 	struct spdk_thread *thread;
2810 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2811 
2812 	if (status != 0) {
2813 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2814 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2815 			    spdk_strerror(-status));
2816 		goto err;
2817 	}
2818 
2819 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2820 
2821 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2822 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2823 
2824 	thread = spdk_thread_create(thread_name, NULL);
2825 	if (thread == NULL) {
2826 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2827 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2828 		goto err;
2829 	}
2830 
2831 	raid_bdev->process = process;
2832 
2833 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2834 
2835 	return;
2836 err:
2837 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2838 			      raid_bdev_channels_abort_start_process_done);
2839 }
2840 
2841 static void
2842 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2843 {
2844 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2845 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2846 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2847 	int rc;
2848 
2849 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2850 
2851 	spdk_for_each_channel_continue(i, rc);
2852 }
2853 
2854 static void
2855 raid_bdev_process_start(struct raid_bdev_process *process)
2856 {
2857 	struct raid_bdev *raid_bdev = process->raid_bdev;
2858 
2859 	assert(raid_bdev->module->submit_process_request != NULL);
2860 
2861 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2862 			      raid_bdev_channels_start_process_done);
2863 }
2864 
2865 static void
2866 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2867 {
2868 	spdk_dma_free(process_req->iov.iov_base);
2869 	spdk_dma_free(process_req->md_buf);
2870 	free(process_req);
2871 }
2872 
2873 static struct raid_bdev_process_request *
2874 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2875 {
2876 	struct raid_bdev *raid_bdev = process->raid_bdev;
2877 	struct raid_bdev_process_request *process_req;
2878 
2879 	process_req = calloc(1, sizeof(*process_req));
2880 	if (process_req == NULL) {
2881 		return NULL;
2882 	}
2883 
2884 	process_req->process = process;
2885 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2886 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2887 	if (process_req->iov.iov_base == NULL) {
2888 		free(process_req);
2889 		return NULL;
2890 	}
2891 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2892 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2893 		if (process_req->md_buf == NULL) {
2894 			raid_bdev_process_request_free(process_req);
2895 			return NULL;
2896 		}
2897 	}
2898 
2899 	return process_req;
2900 }
2901 
2902 static void
2903 raid_bdev_process_free(struct raid_bdev_process *process)
2904 {
2905 	struct raid_bdev_process_request *process_req;
2906 
2907 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2908 		TAILQ_REMOVE(&process->requests, process_req, link);
2909 		raid_bdev_process_request_free(process_req);
2910 	}
2911 
2912 	free(process);
2913 }
2914 
2915 static struct raid_bdev_process *
2916 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2917 			struct raid_base_bdev_info *target)
2918 {
2919 	struct raid_bdev_process *process;
2920 	struct raid_bdev_process_request *process_req;
2921 	int i;
2922 
2923 	process = calloc(1, sizeof(*process));
2924 	if (process == NULL) {
2925 		return NULL;
2926 	}
2927 
2928 	process->raid_bdev = raid_bdev;
2929 	process->type = type;
2930 	process->target = target;
2931 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2932 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
2933 					    raid_bdev->bdev.write_unit_size);
2934 	TAILQ_INIT(&process->requests);
2935 	TAILQ_INIT(&process->finish_actions);
2936 
2937 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2938 		process_req = raid_bdev_process_alloc_request(process);
2939 		if (process_req == NULL) {
2940 			raid_bdev_process_free(process);
2941 			return NULL;
2942 		}
2943 
2944 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2945 	}
2946 
2947 	return process;
2948 }
2949 
2950 static int
2951 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2952 {
2953 	struct raid_bdev_process *process;
2954 
2955 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2956 
2957 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2958 	if (process == NULL) {
2959 		return -ENOMEM;
2960 	}
2961 
2962 	raid_bdev_process_start(process);
2963 
2964 	return 0;
2965 }
2966 
2967 static void
2968 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2969 {
2970 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2971 	int rc;
2972 
2973 	/* TODO: defer if rebuild in progress on another base bdev */
2974 	assert(raid_bdev->process == NULL);
2975 
2976 	base_info->is_configured = true;
2977 
2978 	raid_bdev->num_base_bdevs_discovered++;
2979 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2980 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2981 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2982 
2983 	/*
2984 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
2985 	 * of base bdevs we know to be operational members of the array. Usually this is equal
2986 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
2987 	 * degraded.
2988 	 */
2989 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
2990 		rc = raid_bdev_configure(raid_bdev);
2991 		if (rc != 0) {
2992 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
2993 		}
2994 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
2995 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
2996 		raid_bdev->num_base_bdevs_operational++;
2997 		rc = raid_bdev_start_rebuild(base_info);
2998 		if (rc != 0) {
2999 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3000 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3001 		}
3002 	} else {
3003 		rc = 0;
3004 	}
3005 
3006 	if (base_info->configure_cb != NULL) {
3007 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
3008 	}
3009 }
3010 
3011 static void
3012 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3013 		void *ctx)
3014 {
3015 	struct raid_base_bdev_info *base_info = ctx;
3016 
3017 	switch (status) {
3018 	case 0:
3019 		/* valid superblock found */
3020 		SPDK_ERRLOG("Existing raid superblock found on bdev %s\n", base_info->name);
3021 		status = -EEXIST;
3022 		raid_bdev_free_base_bdev_resource(base_info);
3023 		break;
3024 	case -EINVAL:
3025 		/* no valid superblock */
3026 		raid_bdev_configure_base_bdev_cont(base_info);
3027 		return;
3028 	default:
3029 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3030 			    base_info->name, spdk_strerror(-status));
3031 		break;
3032 	}
3033 
3034 	if (base_info->configure_cb != NULL) {
3035 		base_info->configure_cb(base_info->configure_cb_ctx, status);
3036 	}
3037 }
3038 
3039 static int
3040 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3041 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3042 {
3043 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3044 	struct spdk_bdev_desc *desc;
3045 	struct spdk_bdev *bdev;
3046 	const struct spdk_uuid *bdev_uuid;
3047 	int rc;
3048 
3049 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3050 	assert(base_info->desc == NULL);
3051 
3052 	/*
3053 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3054 	 * before claiming the bdev.
3055 	 */
3056 
3057 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3058 		char uuid_str[SPDK_UUID_STRING_LEN];
3059 		const char *bdev_name;
3060 
3061 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3062 
3063 		/* UUID of a bdev is registered as its alias */
3064 		bdev = spdk_bdev_get_by_name(uuid_str);
3065 		if (bdev == NULL) {
3066 			return -ENODEV;
3067 		}
3068 
3069 		bdev_name = spdk_bdev_get_name(bdev);
3070 
3071 		if (base_info->name == NULL) {
3072 			assert(existing == true);
3073 			base_info->name = strdup(bdev_name);
3074 			if (base_info->name == NULL) {
3075 				return -ENOMEM;
3076 			}
3077 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3078 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3079 				    bdev_name, base_info->name);
3080 			return -EINVAL;
3081 		}
3082 	}
3083 
3084 	assert(base_info->name != NULL);
3085 
3086 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3087 	if (rc != 0) {
3088 		if (rc != -ENODEV) {
3089 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3090 		}
3091 		return rc;
3092 	}
3093 
3094 	bdev = spdk_bdev_desc_get_bdev(desc);
3095 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3096 
3097 	if (spdk_uuid_is_null(&base_info->uuid)) {
3098 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3099 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3100 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3101 		spdk_bdev_close(desc);
3102 		return -EINVAL;
3103 	}
3104 
3105 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3106 	if (rc != 0) {
3107 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3108 		spdk_bdev_close(desc);
3109 		return rc;
3110 	}
3111 
3112 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3113 
3114 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3115 	if (base_info->app_thread_ch == NULL) {
3116 		SPDK_ERRLOG("Failed to get io channel\n");
3117 		spdk_bdev_module_release_bdev(bdev);
3118 		spdk_bdev_close(desc);
3119 		return -ENOMEM;
3120 	}
3121 
3122 	base_info->desc = desc;
3123 	base_info->blockcnt = bdev->blockcnt;
3124 
3125 	if (raid_bdev->superblock_enabled) {
3126 		uint64_t data_offset;
3127 
3128 		if (base_info->data_offset == 0) {
3129 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3130 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3131 		} else {
3132 			data_offset = base_info->data_offset;
3133 		}
3134 
3135 		if (bdev->optimal_io_boundary != 0) {
3136 			data_offset = spdk_divide_round_up(data_offset,
3137 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3138 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3139 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3140 					     base_info->data_offset, base_info->name, data_offset);
3141 				data_offset = base_info->data_offset;
3142 			}
3143 		}
3144 
3145 		base_info->data_offset = data_offset;
3146 	}
3147 
3148 	if (base_info->data_offset >= bdev->blockcnt) {
3149 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3150 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3151 		rc = -EINVAL;
3152 		goto out;
3153 	}
3154 
3155 	if (base_info->data_size == 0) {
3156 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3157 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3158 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3159 			    bdev->blockcnt, base_info->name);
3160 		rc = -EINVAL;
3161 		goto out;
3162 	}
3163 
3164 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3165 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3166 			    bdev->name);
3167 		rc = -EINVAL;
3168 		goto out;
3169 	}
3170 
3171 	/*
3172 	 * Set the raid bdev properties if this is the first base bdev configured,
3173 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3174 	 * have the same blocklen and metadata format.
3175 	 */
3176 	if (raid_bdev->bdev.blocklen == 0) {
3177 		raid_bdev->bdev.blocklen = bdev->blocklen;
3178 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3179 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3180 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3181 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3182 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3183 	} else {
3184 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3185 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3186 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3187 			rc = -EINVAL;
3188 			goto out;
3189 		}
3190 
3191 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3192 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3193 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3194 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3195 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev)) {
3196 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3197 				    raid_bdev->bdev.name, bdev->name);
3198 			rc = -EINVAL;
3199 			goto out;
3200 		}
3201 	}
3202 
3203 	base_info->configure_cb = cb_fn;
3204 	base_info->configure_cb_ctx = cb_ctx;
3205 
3206 	if (existing) {
3207 		raid_bdev_configure_base_bdev_cont(base_info);
3208 	} else {
3209 		/* check for existing superblock when using a new bdev */
3210 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3211 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3212 		if (rc) {
3213 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3214 				    bdev->name, spdk_strerror(-rc));
3215 		}
3216 	}
3217 out:
3218 	if (rc != 0) {
3219 		raid_bdev_free_base_bdev_resource(base_info);
3220 	}
3221 	return rc;
3222 }
3223 
3224 static int
3225 _raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3226 			   uint64_t data_offset, uint64_t data_size,
3227 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3228 {
3229 	struct raid_base_bdev_info *base_info;
3230 
3231 	assert(name != NULL);
3232 
3233 	if (slot >= raid_bdev->num_base_bdevs) {
3234 		return -EINVAL;
3235 	}
3236 
3237 	base_info = &raid_bdev->base_bdev_info[slot];
3238 
3239 	if (base_info->name != NULL) {
3240 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev '%s'\n",
3241 			    slot, raid_bdev->bdev.name, base_info->name);
3242 		return -EBUSY;
3243 	}
3244 
3245 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3246 		char uuid_str[SPDK_UUID_STRING_LEN];
3247 
3248 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3249 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev with uuid %s\n",
3250 			    slot, raid_bdev->bdev.name, uuid_str);
3251 		return -EBUSY;
3252 	}
3253 
3254 	base_info->name = strdup(name);
3255 	if (base_info->name == NULL) {
3256 		return -ENOMEM;
3257 	}
3258 
3259 	base_info->data_offset = data_offset;
3260 	base_info->data_size = data_size;
3261 
3262 	return raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3263 }
3264 
3265 int
3266 raid_bdev_attach_base_bdev(struct raid_bdev *raid_bdev, struct spdk_bdev *base_bdev,
3267 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3268 {
3269 	struct raid_base_bdev_info *base_info = NULL, *iter;
3270 	int rc;
3271 
3272 	SPDK_DEBUGLOG(bdev_raid, "attach_base_device: %s\n", base_bdev->name);
3273 
3274 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3275 
3276 	if (raid_bdev->process != NULL) {
3277 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3278 			    raid_bdev->bdev.name);
3279 		return -EPERM;
3280 	}
3281 
3282 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
3283 		SPDK_ERRLOG("raid bdev '%s' must be in online state to attach base bdev\n",
3284 			    raid_bdev->bdev.name);
3285 		return -EINVAL;
3286 	}
3287 
3288 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3289 		if (iter->desc == NULL) {
3290 			base_info = iter;
3291 			break;
3292 		}
3293 	}
3294 
3295 	if (base_info == NULL) {
3296 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3297 			    raid_bdev->bdev.name, base_bdev->name);
3298 		return -EINVAL;
3299 	}
3300 
3301 	assert(base_info->is_configured == false);
3302 	assert(base_info->data_size != 0);
3303 
3304 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
3305 
3306 	rc = _raid_bdev_add_base_device(raid_bdev, base_bdev->name,
3307 					raid_bdev_base_bdev_slot(base_info),
3308 					base_info->data_offset, base_info->data_size,
3309 					cb_fn, cb_ctx);
3310 	if (rc != 0) {
3311 		SPDK_ERRLOG("base bdev '%s' attach failed: %s\n", base_bdev->name, spdk_strerror(-rc));
3312 		raid_bdev_free_base_bdev_resource(base_info);
3313 	}
3314 
3315 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
3316 
3317 	return rc;
3318 }
3319 
3320 /*
3321  * brief:
3322  * raid_bdev_add_base_device function is the actual function which either adds
3323  * the nvme base device to existing raid bdev or create a new raid bdev. It also claims
3324  * the base device and keep the open descriptor.
3325  * params:
3326  * raid_bdev - pointer to raid bdev
3327  * name - name of the base bdev
3328  * slot - position to add base bdev
3329  * cb_fn - callback function
3330  * cb_ctx - argument to callback function
3331  * returns:
3332  * 0 - success
3333  * non zero - failure
3334  */
3335 int
3336 raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3337 			  raid_base_bdev_cb cb_fn, void *cb_ctx)
3338 {
3339 	return _raid_bdev_add_base_device(raid_bdev, name, slot, 0, 0, cb_fn, cb_ctx);
3340 }
3341 
3342 static int
3343 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3344 {
3345 	struct raid_bdev *raid_bdev;
3346 	uint8_t i;
3347 	int rc;
3348 
3349 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3350 			       sb->level, true, &sb->uuid, &raid_bdev);
3351 	if (rc != 0) {
3352 		return rc;
3353 	}
3354 
3355 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3356 	if (rc != 0) {
3357 		raid_bdev_free(raid_bdev);
3358 		return rc;
3359 	}
3360 
3361 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3362 	memcpy(raid_bdev->sb, sb, sb->length);
3363 
3364 	for (i = 0; i < sb->base_bdevs_size; i++) {
3365 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3366 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3367 
3368 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3369 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3370 			raid_bdev->num_base_bdevs_operational++;
3371 		}
3372 
3373 		base_info->data_offset = sb_base_bdev->data_offset;
3374 		base_info->data_size = sb_base_bdev->data_size;
3375 	}
3376 
3377 	*raid_bdev_out = raid_bdev;
3378 	return 0;
3379 }
3380 
3381 static void
3382 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3383 {
3384 	struct raid_bdev *raid_bdev;
3385 	struct raid_base_bdev_info *base_info;
3386 
3387 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3388 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3389 			if (base_info->desc == NULL && base_info->name != NULL &&
3390 			    strcmp(bdev->name, base_info->name) == 0) {
3391 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3392 				break;
3393 			}
3394 		}
3395 	}
3396 }
3397 
3398 static void
3399 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev)
3400 {
3401 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3402 	struct raid_bdev *raid_bdev;
3403 	struct raid_base_bdev_info *iter, *base_info;
3404 	uint8_t i;
3405 	int rc;
3406 
3407 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3408 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3409 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3410 		return;
3411 	}
3412 
3413 	if (spdk_uuid_is_null(&sb->uuid)) {
3414 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3415 		return;
3416 	}
3417 
3418 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3419 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3420 			break;
3421 		}
3422 	}
3423 
3424 	if (raid_bdev) {
3425 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3426 			SPDK_DEBUGLOG(bdev_raid,
3427 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3428 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3429 
3430 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3431 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3432 					     raid_bdev->bdev.name, bdev->name);
3433 				return;
3434 			}
3435 
3436 			/* remove and then recreate the raid bdev using the newer superblock */
3437 			raid_bdev_delete(raid_bdev, NULL, NULL);
3438 			raid_bdev = NULL;
3439 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3440 			SPDK_DEBUGLOG(bdev_raid,
3441 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3442 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3443 			/* use the current raid bdev superblock */
3444 			sb = raid_bdev->sb;
3445 		}
3446 	}
3447 
3448 	for (i = 0; i < sb->base_bdevs_size; i++) {
3449 		sb_base_bdev = &sb->base_bdevs[i];
3450 
3451 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3452 
3453 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3454 			break;
3455 		}
3456 	}
3457 
3458 	if (i == sb->base_bdevs_size) {
3459 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3460 		return;
3461 	}
3462 
3463 	if (!raid_bdev) {
3464 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3465 		if (rc != 0) {
3466 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3467 				    sb->name, spdk_strerror(-rc));
3468 			return;
3469 		}
3470 	}
3471 
3472 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3473 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3474 			       bdev->name, raid_bdev->bdev.name);
3475 		return;
3476 	}
3477 
3478 	base_info = NULL;
3479 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3480 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3481 			base_info = iter;
3482 			break;
3483 		}
3484 	}
3485 
3486 	if (base_info == NULL) {
3487 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3488 			    bdev->name, raid_bdev->bdev.name);
3489 		return;
3490 	}
3491 
3492 	rc = raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3493 	if (rc != 0) {
3494 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3495 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3496 	}
3497 }
3498 
3499 struct raid_bdev_examine_ctx {
3500 	struct spdk_bdev_desc *desc;
3501 	struct spdk_io_channel *ch;
3502 };
3503 
3504 static void
3505 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3506 {
3507 	if (!ctx) {
3508 		return;
3509 	}
3510 
3511 	if (ctx->ch) {
3512 		spdk_put_io_channel(ctx->ch);
3513 	}
3514 
3515 	if (ctx->desc) {
3516 		spdk_bdev_close(ctx->desc);
3517 	}
3518 
3519 	free(ctx);
3520 }
3521 
3522 static void
3523 raid_bdev_examine_load_sb_cb(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3524 {
3525 	struct raid_bdev_examine_ctx *ctx = _ctx;
3526 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3527 
3528 	switch (status) {
3529 	case 0:
3530 		/* valid superblock found */
3531 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3532 		raid_bdev_examine_sb(sb, bdev);
3533 		break;
3534 	case -EINVAL:
3535 		/* no valid superblock, check if it can be claimed anyway */
3536 		raid_bdev_examine_no_sb(bdev);
3537 		break;
3538 	default:
3539 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3540 			    bdev->name, spdk_strerror(-status));
3541 		break;
3542 	}
3543 
3544 	raid_bdev_examine_ctx_free(ctx);
3545 	spdk_bdev_module_examine_done(&g_raid_if);
3546 }
3547 
3548 static void
3549 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3550 {
3551 }
3552 
3553 /*
3554  * brief:
3555  * raid_bdev_examine function is the examine function call by the below layers
3556  * like bdev_nvme layer. This function will check if this base bdev can be
3557  * claimed by this raid bdev or not.
3558  * params:
3559  * bdev - pointer to base bdev
3560  * returns:
3561  * none
3562  */
3563 static void
3564 raid_bdev_examine(struct spdk_bdev *bdev)
3565 {
3566 	struct raid_bdev_examine_ctx *ctx;
3567 	int rc;
3568 
3569 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3570 		goto done;
3571 	}
3572 
3573 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3574 		raid_bdev_examine_no_sb(bdev);
3575 		goto done;
3576 	}
3577 
3578 	ctx = calloc(1, sizeof(*ctx));
3579 	if (!ctx) {
3580 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3581 			    bdev->name, spdk_strerror(ENOMEM));
3582 		goto err;
3583 	}
3584 
3585 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, raid_bdev_examine_event_cb, NULL,
3586 				&ctx->desc);
3587 	if (rc) {
3588 		SPDK_ERRLOG("Failed to open bdev %s: %s\n",
3589 			    bdev->name, spdk_strerror(-rc));
3590 		goto err;
3591 	}
3592 
3593 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3594 	if (!ctx->ch) {
3595 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev->name);
3596 		goto err;
3597 	}
3598 
3599 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_cb, ctx);
3600 	if (rc) {
3601 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3602 			    bdev->name, spdk_strerror(-rc));
3603 		goto err;
3604 	}
3605 
3606 	return;
3607 err:
3608 	raid_bdev_examine_ctx_free(ctx);
3609 done:
3610 	spdk_bdev_module_examine_done(&g_raid_if);
3611 }
3612 
3613 /* Log component for bdev raid bdev module */
3614 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3615