xref: /spdk/module/bdev/raid/bdev_raid.c (revision 60241941e6cfa4fc04cfcf6840c79f941ccf85d0)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT	1024
20 #define RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT	0
21 
22 static bool g_shutdown_started = false;
23 
24 /* List of all raid bdevs */
25 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
26 
27 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
28 
29 /*
30  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
31  * contains the relationship of raid bdev io channel with base bdev io channels.
32  */
33 struct raid_bdev_io_channel {
34 	/* Array of IO channels of base bdevs */
35 	struct spdk_io_channel	**base_channel;
36 
37 	/* Private raid module IO channel */
38 	struct spdk_io_channel	*module_channel;
39 
40 	/* Background process data */
41 	struct {
42 		uint64_t offset;
43 		struct spdk_io_channel *target_ch;
44 		struct raid_bdev_io_channel *ch_processed;
45 	} process;
46 };
47 
48 enum raid_bdev_process_state {
49 	RAID_PROCESS_STATE_INIT,
50 	RAID_PROCESS_STATE_RUNNING,
51 	RAID_PROCESS_STATE_STOPPING,
52 	RAID_PROCESS_STATE_STOPPED,
53 };
54 
55 struct raid_process_qos {
56 	bool enable_qos;
57 	uint64_t last_tsc;
58 	double bytes_per_tsc;
59 	double bytes_available;
60 	double bytes_max;
61 	struct spdk_poller *process_continue_poller;
62 };
63 
64 struct raid_bdev_process {
65 	struct raid_bdev		*raid_bdev;
66 	enum raid_process_type		type;
67 	enum raid_bdev_process_state	state;
68 	struct spdk_thread		*thread;
69 	struct raid_bdev_io_channel	*raid_ch;
70 	TAILQ_HEAD(, raid_bdev_process_request) requests;
71 	uint64_t			max_window_size;
72 	uint64_t			window_size;
73 	uint64_t			window_remaining;
74 	int				window_status;
75 	uint64_t			window_offset;
76 	bool				window_range_locked;
77 	struct raid_base_bdev_info	*target;
78 	int				status;
79 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
80 	struct raid_process_qos		qos;
81 };
82 
83 struct raid_process_finish_action {
84 	spdk_msg_fn cb;
85 	void *cb_ctx;
86 	TAILQ_ENTRY(raid_process_finish_action) link;
87 };
88 
89 static struct spdk_raid_bdev_opts g_opts = {
90 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
91 	.process_max_bandwidth_mb_sec = RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT,
92 };
93 
94 void
95 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
96 {
97 	*opts = g_opts;
98 }
99 
100 int
101 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
102 {
103 	if (opts->process_window_size_kb == 0) {
104 		return -EINVAL;
105 	}
106 
107 	g_opts = *opts;
108 
109 	return 0;
110 }
111 
112 static struct raid_bdev_module *
113 raid_bdev_module_find(enum raid_level level)
114 {
115 	struct raid_bdev_module *raid_module;
116 
117 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
118 		if (raid_module->level == level) {
119 			return raid_module;
120 		}
121 	}
122 
123 	return NULL;
124 }
125 
126 void
127 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
128 {
129 	if (raid_bdev_module_find(raid_module->level) != NULL) {
130 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
131 			    raid_bdev_level_to_str(raid_module->level));
132 		assert(false);
133 	} else {
134 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
135 	}
136 }
137 
138 struct spdk_io_channel *
139 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
140 {
141 	return raid_ch->base_channel[idx];
142 }
143 
144 void *
145 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
146 {
147 	assert(raid_ch->module_channel != NULL);
148 
149 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
150 }
151 
152 struct raid_base_bdev_info *
153 raid_bdev_channel_get_base_info(struct raid_bdev_io_channel *raid_ch, struct spdk_bdev *base_bdev)
154 {
155 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
156 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
157 	uint8_t i;
158 
159 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
160 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[i];
161 
162 		if (base_info->is_configured &&
163 		    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
164 			return base_info;
165 		}
166 	}
167 
168 	return NULL;
169 }
170 
171 /* Function declarations */
172 static void	raid_bdev_examine(struct spdk_bdev *bdev);
173 static int	raid_bdev_init(void);
174 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
175 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
176 
177 static void
178 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
179 {
180 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
181 
182 	if (raid_ch->process.target_ch != NULL) {
183 		spdk_put_io_channel(raid_ch->process.target_ch);
184 		raid_ch->process.target_ch = NULL;
185 	}
186 
187 	if (raid_ch->process.ch_processed != NULL) {
188 		free(raid_ch->process.ch_processed->base_channel);
189 		free(raid_ch->process.ch_processed);
190 		raid_ch->process.ch_processed = NULL;
191 	}
192 }
193 
194 static int
195 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
196 {
197 	struct raid_bdev *raid_bdev = process->raid_bdev;
198 	struct raid_bdev_io_channel *raid_ch_processed;
199 	struct raid_base_bdev_info *base_info;
200 
201 	raid_ch->process.offset = process->window_offset;
202 
203 	/* In the future we may have other types of processes which don't use a target bdev,
204 	 * like data scrubbing or strip size migration. Until then, expect that there always is
205 	 * a process target. */
206 	assert(process->target != NULL);
207 
208 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
209 	if (raid_ch->process.target_ch == NULL) {
210 		goto err;
211 	}
212 
213 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
214 	if (raid_ch_processed == NULL) {
215 		goto err;
216 	}
217 	raid_ch->process.ch_processed = raid_ch_processed;
218 
219 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
220 					  sizeof(*raid_ch_processed->base_channel));
221 	if (raid_ch_processed->base_channel == NULL) {
222 		goto err;
223 	}
224 
225 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
226 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
227 
228 		if (base_info != process->target) {
229 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
230 		} else {
231 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
232 		}
233 	}
234 
235 	raid_ch_processed->module_channel = raid_ch->module_channel;
236 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
237 
238 	return 0;
239 err:
240 	raid_bdev_ch_process_cleanup(raid_ch);
241 	return -ENOMEM;
242 }
243 
244 /*
245  * brief:
246  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
247  * hierarchy from raid bdev to base bdev io channels. It will be called per core
248  * params:
249  * io_device - pointer to raid bdev io device represented by raid_bdev
250  * ctx_buf - pointer to context buffer for raid bdev io channel
251  * returns:
252  * 0 - success
253  * non zero - failure
254  */
255 static int
256 raid_bdev_create_cb(void *io_device, void *ctx_buf)
257 {
258 	struct raid_bdev            *raid_bdev = io_device;
259 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
260 	uint8_t i;
261 	int ret = -ENOMEM;
262 
263 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
264 
265 	assert(raid_bdev != NULL);
266 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
267 
268 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
269 	if (!raid_ch->base_channel) {
270 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
271 		return -ENOMEM;
272 	}
273 
274 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
275 		/*
276 		 * Get the spdk_io_channel for all the base bdevs. This is used during
277 		 * split logic to send the respective child bdev ios to respective base
278 		 * bdev io channel.
279 		 * Skip missing base bdevs and the process target, which should also be treated as
280 		 * missing until the process completes.
281 		 */
282 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
283 		    raid_bdev->base_bdev_info[i].is_process_target == true) {
284 			continue;
285 		}
286 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
287 						   raid_bdev->base_bdev_info[i].desc);
288 		if (!raid_ch->base_channel[i]) {
289 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
290 			goto err;
291 		}
292 	}
293 
294 	if (raid_bdev->module->get_io_channel) {
295 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
296 		if (!raid_ch->module_channel) {
297 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
298 			goto err;
299 		}
300 	}
301 
302 	if (raid_bdev->process != NULL) {
303 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
304 		if (ret != 0) {
305 			SPDK_ERRLOG("Failed to setup process io channel\n");
306 			goto err;
307 		}
308 	} else {
309 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
310 	}
311 
312 	return 0;
313 err:
314 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
315 		if (raid_ch->base_channel[i] != NULL) {
316 			spdk_put_io_channel(raid_ch->base_channel[i]);
317 		}
318 	}
319 	free(raid_ch->base_channel);
320 
321 	raid_bdev_ch_process_cleanup(raid_ch);
322 
323 	return ret;
324 }
325 
326 /*
327  * brief:
328  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
329  * hierarchy from raid bdev to base bdev io channels. It will be called per core
330  * params:
331  * io_device - pointer to raid bdev io device represented by raid_bdev
332  * ctx_buf - pointer to context buffer for raid bdev io channel
333  * returns:
334  * none
335  */
336 static void
337 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
338 {
339 	struct raid_bdev *raid_bdev = io_device;
340 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
341 	uint8_t i;
342 
343 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
344 
345 	assert(raid_ch != NULL);
346 	assert(raid_ch->base_channel);
347 
348 	if (raid_ch->module_channel) {
349 		spdk_put_io_channel(raid_ch->module_channel);
350 	}
351 
352 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
353 		/* Free base bdev channels */
354 		if (raid_ch->base_channel[i] != NULL) {
355 			spdk_put_io_channel(raid_ch->base_channel[i]);
356 		}
357 	}
358 	free(raid_ch->base_channel);
359 	raid_ch->base_channel = NULL;
360 
361 	raid_bdev_ch_process_cleanup(raid_ch);
362 }
363 
364 /*
365  * brief:
366  * raid_bdev_cleanup is used to cleanup raid_bdev related data
367  * structures.
368  * params:
369  * raid_bdev - pointer to raid_bdev
370  * returns:
371  * none
372  */
373 static void
374 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
375 {
376 	struct raid_base_bdev_info *base_info;
377 
378 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
379 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
380 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
381 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
382 
383 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
384 		assert(base_info->desc == NULL);
385 		free(base_info->name);
386 	}
387 
388 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
389 }
390 
391 static void
392 raid_bdev_free(struct raid_bdev *raid_bdev)
393 {
394 	raid_bdev_free_superblock(raid_bdev);
395 	free(raid_bdev->base_bdev_info);
396 	free(raid_bdev->bdev.name);
397 	free(raid_bdev);
398 }
399 
400 static void
401 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
402 {
403 	raid_bdev_cleanup(raid_bdev);
404 	raid_bdev_free(raid_bdev);
405 }
406 
407 static void
408 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
409 {
410 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
411 
412 	assert(base_info->is_configured);
413 	assert(raid_bdev->num_base_bdevs_discovered);
414 	raid_bdev->num_base_bdevs_discovered--;
415 	base_info->is_configured = false;
416 	base_info->is_process_target = false;
417 }
418 
419 /*
420  * brief:
421  * free resource of base bdev for raid bdev
422  * params:
423  * base_info - raid base bdev info
424  * returns:
425  * none
426  */
427 static void
428 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
429 {
430 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
431 
432 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
433 	assert(base_info->configure_cb == NULL);
434 
435 	free(base_info->name);
436 	base_info->name = NULL;
437 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
438 		spdk_uuid_set_null(&base_info->uuid);
439 	}
440 	base_info->is_failed = false;
441 
442 	/* clear `data_offset` to allow it to be recalculated during configuration */
443 	base_info->data_offset = 0;
444 
445 	if (base_info->desc == NULL) {
446 		return;
447 	}
448 
449 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
450 	spdk_bdev_close(base_info->desc);
451 	base_info->desc = NULL;
452 	spdk_put_io_channel(base_info->app_thread_ch);
453 	base_info->app_thread_ch = NULL;
454 
455 	if (base_info->is_configured) {
456 		raid_bdev_deconfigure_base_bdev(base_info);
457 	}
458 }
459 
460 static void
461 raid_bdev_io_device_unregister_cb(void *io_device)
462 {
463 	struct raid_bdev *raid_bdev = io_device;
464 
465 	if (raid_bdev->num_base_bdevs_discovered == 0) {
466 		/* Free raid_bdev when there are no base bdevs left */
467 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
468 		raid_bdev_cleanup(raid_bdev);
469 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
470 		raid_bdev_free(raid_bdev);
471 	} else {
472 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
473 	}
474 }
475 
476 void
477 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
478 {
479 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
480 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
481 	}
482 }
483 
484 static void
485 _raid_bdev_destruct(void *ctxt)
486 {
487 	struct raid_bdev *raid_bdev = ctxt;
488 	struct raid_base_bdev_info *base_info;
489 
490 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
491 
492 	assert(raid_bdev->process == NULL);
493 
494 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
495 		/*
496 		 * Close all base bdev descriptors for which call has come from below
497 		 * layers.  Also close the descriptors if we have started shutdown.
498 		 */
499 		if (g_shutdown_started || base_info->remove_scheduled == true) {
500 			raid_bdev_free_base_bdev_resource(base_info);
501 		}
502 	}
503 
504 	if (g_shutdown_started) {
505 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
506 	}
507 
508 	if (raid_bdev->module->stop != NULL) {
509 		if (raid_bdev->module->stop(raid_bdev) == false) {
510 			return;
511 		}
512 	}
513 
514 	raid_bdev_module_stop_done(raid_bdev);
515 }
516 
517 static int
518 raid_bdev_destruct(void *ctx)
519 {
520 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
521 
522 	return 1;
523 }
524 
525 int
526 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
527 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
528 {
529 	struct spdk_dif_ctx dif_ctx;
530 	struct spdk_dif_error err_blk = {};
531 	int rc;
532 	struct spdk_dif_ctx_init_ext_opts dif_opts;
533 	struct iovec md_iov = {
534 		.iov_base	= md_buf,
535 		.iov_len	= num_blocks * bdev->md_len,
536 	};
537 
538 	if (md_buf == NULL) {
539 		return 0;
540 	}
541 
542 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
543 	dif_opts.dif_pi_format = bdev->dif_pi_format;
544 	rc = spdk_dif_ctx_init(&dif_ctx,
545 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
546 			       bdev->dif_is_head_of_md, bdev->dif_type,
547 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
548 			       0, 0, 0, 0, 0, &dif_opts);
549 	if (rc != 0) {
550 		SPDK_ERRLOG("Initialization of DIF context failed\n");
551 		return rc;
552 	}
553 
554 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
555 
556 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
557 	if (rc != 0) {
558 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
559 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
560 	}
561 
562 	return rc;
563 }
564 
565 int
566 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
567 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
568 {
569 	struct spdk_dif_ctx dif_ctx;
570 	struct spdk_dif_error err_blk = {};
571 	int rc;
572 	struct spdk_dif_ctx_init_ext_opts dif_opts;
573 	struct iovec md_iov = {
574 		.iov_base	= md_buf,
575 		.iov_len	= num_blocks * bdev->md_len,
576 	};
577 
578 	if (md_buf == NULL) {
579 		return 0;
580 	}
581 
582 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
583 	dif_opts.dif_pi_format = bdev->dif_pi_format;
584 	rc = spdk_dif_ctx_init(&dif_ctx,
585 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
586 			       bdev->dif_is_head_of_md, bdev->dif_type,
587 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
588 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
589 	if (rc != 0) {
590 		SPDK_ERRLOG("Initialization of DIF context failed\n");
591 		return rc;
592 	}
593 
594 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
595 	if (rc != 0) {
596 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
597 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
598 	}
599 
600 	return rc;
601 }
602 
603 void
604 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
605 {
606 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
607 	int rc;
608 
609 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
610 		struct iovec *split_iov = raid_io->split.iov;
611 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
612 
613 		/*
614 		 * Non-zero offset here means that this is the completion of the first part of the
615 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
616 		 */
617 		if (raid_io->split.offset != 0) {
618 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
619 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
620 
621 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
622 				raid_io->num_blocks = raid_io->split.offset;
623 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
624 				raid_io->iovs = bdev_io->u.bdev.iovs;
625 				if (split_iov != NULL) {
626 					raid_io->iovcnt++;
627 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
628 					split_iov->iov_base = split_iov_orig->iov_base;
629 				}
630 
631 				raid_io->split.offset = 0;
632 				raid_io->base_bdev_io_submitted = 0;
633 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
634 
635 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
636 				return;
637 			}
638 		}
639 
640 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
641 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
642 		raid_io->iovs = bdev_io->u.bdev.iovs;
643 		if (split_iov != NULL) {
644 			*split_iov = *split_iov_orig;
645 		}
646 	}
647 
648 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
649 		raid_io->completion_cb(raid_io, status);
650 	} else {
651 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
652 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
653 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
654 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
655 
656 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
657 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
658 							bdev_io->u.bdev.offset_blocks);
659 			if (rc != 0) {
660 				status = SPDK_BDEV_IO_STATUS_FAILED;
661 			}
662 		}
663 		spdk_bdev_io_complete(bdev_io, status);
664 	}
665 }
666 
667 /*
668  * brief:
669  * raid_bdev_io_complete_part - signal the completion of a part of the expected
670  * base bdev IOs and complete the raid_io if this is the final expected IO.
671  * The caller should first set raid_io->base_bdev_io_remaining. This function
672  * will decrement this counter by the value of the 'completed' parameter and
673  * complete the raid_io if the counter reaches 0. The caller is free to
674  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
675  * it can represent e.g. blocks or IOs.
676  * params:
677  * raid_io - pointer to raid_bdev_io
678  * completed - the part of the raid_io that has been completed
679  * status - status of the base IO
680  * returns:
681  * true - if the raid_io is completed
682  * false - otherwise
683  */
684 bool
685 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
686 			   enum spdk_bdev_io_status status)
687 {
688 	assert(raid_io->base_bdev_io_remaining >= completed);
689 	raid_io->base_bdev_io_remaining -= completed;
690 
691 	if (status != raid_io->base_bdev_io_status_default) {
692 		raid_io->base_bdev_io_status = status;
693 	}
694 
695 	if (raid_io->base_bdev_io_remaining == 0) {
696 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
697 		return true;
698 	} else {
699 		return false;
700 	}
701 }
702 
703 /*
704  * brief:
705  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
706  * It will try to queue the IOs after storing the context to bdev wait queue logic.
707  * params:
708  * raid_io - pointer to raid_bdev_io
709  * bdev - the block device that the IO is submitted to
710  * ch - io channel
711  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
712  * returns:
713  * none
714  */
715 void
716 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
717 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
718 {
719 	raid_io->waitq_entry.bdev = bdev;
720 	raid_io->waitq_entry.cb_fn = cb_fn;
721 	raid_io->waitq_entry.cb_arg = raid_io;
722 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
723 }
724 
725 static void
726 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
727 {
728 	struct raid_bdev_io *raid_io = cb_arg;
729 
730 	spdk_bdev_free_io(bdev_io);
731 
732 	raid_bdev_io_complete_part(raid_io, 1, success ?
733 				   SPDK_BDEV_IO_STATUS_SUCCESS :
734 				   SPDK_BDEV_IO_STATUS_FAILED);
735 }
736 
737 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
738 
739 static void
740 _raid_bdev_submit_reset_request(void *_raid_io)
741 {
742 	struct raid_bdev_io *raid_io = _raid_io;
743 
744 	raid_bdev_submit_reset_request(raid_io);
745 }
746 
747 /*
748  * brief:
749  * raid_bdev_submit_reset_request function submits reset requests
750  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
751  * which case it will queue it for later submission
752  * params:
753  * raid_io
754  * returns:
755  * none
756  */
757 static void
758 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
759 {
760 	struct raid_bdev		*raid_bdev;
761 	int				ret;
762 	uint8_t				i;
763 	struct raid_base_bdev_info	*base_info;
764 	struct spdk_io_channel		*base_ch;
765 
766 	raid_bdev = raid_io->raid_bdev;
767 
768 	if (raid_io->base_bdev_io_remaining == 0) {
769 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
770 	}
771 
772 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
773 		base_info = &raid_bdev->base_bdev_info[i];
774 		base_ch = raid_io->raid_ch->base_channel[i];
775 		if (base_ch == NULL) {
776 			raid_io->base_bdev_io_submitted++;
777 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
778 			continue;
779 		}
780 		ret = spdk_bdev_reset(base_info->desc, base_ch,
781 				      raid_base_bdev_reset_complete, raid_io);
782 		if (ret == 0) {
783 			raid_io->base_bdev_io_submitted++;
784 		} else if (ret == -ENOMEM) {
785 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
786 						base_ch, _raid_bdev_submit_reset_request);
787 			return;
788 		} else {
789 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
790 			assert(false);
791 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
792 			return;
793 		}
794 	}
795 }
796 
797 static void
798 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
799 {
800 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
801 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
802 	int i;
803 
804 	assert(split_offset != 0);
805 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
806 	raid_io->split.offset = split_offset;
807 
808 	raid_io->offset_blocks += split_offset;
809 	raid_io->num_blocks -= split_offset;
810 	if (raid_io->md_buf != NULL) {
811 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
812 	}
813 
814 	for (i = 0; i < raid_io->iovcnt; i++) {
815 		struct iovec *iov = &raid_io->iovs[i];
816 
817 		if (iov_offset < iov->iov_len) {
818 			if (iov_offset == 0) {
819 				raid_io->split.iov = NULL;
820 			} else {
821 				raid_io->split.iov = iov;
822 				raid_io->split.iov_copy = *iov;
823 				iov->iov_base += iov_offset;
824 				iov->iov_len -= iov_offset;
825 			}
826 			raid_io->iovs += i;
827 			raid_io->iovcnt -= i;
828 			break;
829 		}
830 
831 		iov_offset -= iov->iov_len;
832 	}
833 }
834 
835 static void
836 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
837 {
838 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
839 
840 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
841 		uint64_t offset_begin = raid_io->offset_blocks;
842 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
843 
844 		if (offset_end > raid_ch->process.offset) {
845 			if (offset_begin < raid_ch->process.offset) {
846 				/*
847 				 * If the I/O spans both the processed and unprocessed ranges,
848 				 * split it and first handle the unprocessed part. After it
849 				 * completes, the rest will be handled.
850 				 * This situation occurs when the process thread is not active
851 				 * or is waiting for the process window range to be locked
852 				 * (quiesced). When a window is being processed, such I/Os will be
853 				 * deferred by the bdev layer until the window is unlocked.
854 				 */
855 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
856 					      raid_ch->process.offset, offset_begin, offset_end);
857 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
858 			}
859 		} else {
860 			/* Use the child channel, which corresponds to the already processed range */
861 			raid_io->raid_ch = raid_ch->process.ch_processed;
862 		}
863 	}
864 
865 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
866 }
867 
868 /*
869  * brief:
870  * Callback function to spdk_bdev_io_get_buf.
871  * params:
872  * ch - pointer to raid bdev io channel
873  * bdev_io - pointer to parent bdev_io on raid bdev device
874  * success - True if buffer is allocated or false otherwise.
875  * returns:
876  * none
877  */
878 static void
879 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
880 		     bool success)
881 {
882 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
883 
884 	if (!success) {
885 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
886 		return;
887 	}
888 
889 	raid_bdev_submit_rw_request(raid_io);
890 }
891 
892 void
893 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
894 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
895 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
896 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
897 {
898 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
899 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
900 
901 	raid_io->type = type;
902 	raid_io->offset_blocks = offset_blocks;
903 	raid_io->num_blocks = num_blocks;
904 	raid_io->iovs = iovs;
905 	raid_io->iovcnt = iovcnt;
906 	raid_io->memory_domain = memory_domain;
907 	raid_io->memory_domain_ctx = memory_domain_ctx;
908 	raid_io->md_buf = md_buf;
909 
910 	raid_io->raid_bdev = raid_bdev;
911 	raid_io->raid_ch = raid_ch;
912 	raid_io->base_bdev_io_remaining = 0;
913 	raid_io->base_bdev_io_submitted = 0;
914 	raid_io->completion_cb = NULL;
915 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
916 
917 	raid_bdev_io_set_default_status(raid_io, SPDK_BDEV_IO_STATUS_SUCCESS);
918 }
919 
920 /*
921  * brief:
922  * raid_bdev_submit_request function is the submit_request function pointer of
923  * raid bdev function table. This is used to submit the io on raid_bdev to below
924  * layers.
925  * params:
926  * ch - pointer to raid bdev io channel
927  * bdev_io - pointer to parent bdev_io on raid bdev device
928  * returns:
929  * none
930  */
931 static void
932 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
933 {
934 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
935 
936 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
937 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
938 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
939 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
940 
941 	switch (bdev_io->type) {
942 	case SPDK_BDEV_IO_TYPE_READ:
943 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
944 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
945 		break;
946 	case SPDK_BDEV_IO_TYPE_WRITE:
947 		raid_bdev_submit_rw_request(raid_io);
948 		break;
949 
950 	case SPDK_BDEV_IO_TYPE_RESET:
951 		raid_bdev_submit_reset_request(raid_io);
952 		break;
953 
954 	case SPDK_BDEV_IO_TYPE_FLUSH:
955 	case SPDK_BDEV_IO_TYPE_UNMAP:
956 		if (raid_io->raid_bdev->process != NULL) {
957 			/* TODO: rebuild support */
958 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
959 			return;
960 		}
961 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
962 		break;
963 
964 	default:
965 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
966 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
967 		break;
968 	}
969 }
970 
971 /*
972  * brief:
973  * _raid_bdev_io_type_supported checks whether io_type is supported in
974  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
975  * doesn't support, the raid device doesn't supports.
976  *
977  * params:
978  * raid_bdev - pointer to raid bdev context
979  * io_type - io type
980  * returns:
981  * true - io_type is supported
982  * false - io_type is not supported
983  */
984 inline static bool
985 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
986 {
987 	struct raid_base_bdev_info *base_info;
988 
989 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
990 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
991 		if (raid_bdev->module->submit_null_payload_request == NULL) {
992 			return false;
993 		}
994 	}
995 
996 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
997 		if (base_info->desc == NULL) {
998 			continue;
999 		}
1000 
1001 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
1002 			return false;
1003 		}
1004 	}
1005 
1006 	return true;
1007 }
1008 
1009 /*
1010  * brief:
1011  * raid_bdev_io_type_supported is the io_supported function for bdev function
1012  * table which returns whether the particular io type is supported or not by
1013  * raid bdev module
1014  * params:
1015  * ctx - pointer to raid bdev context
1016  * type - io type
1017  * returns:
1018  * true - io_type is supported
1019  * false - io_type is not supported
1020  */
1021 static bool
1022 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1023 {
1024 	switch (io_type) {
1025 	case SPDK_BDEV_IO_TYPE_READ:
1026 	case SPDK_BDEV_IO_TYPE_WRITE:
1027 		return true;
1028 
1029 	case SPDK_BDEV_IO_TYPE_FLUSH:
1030 	case SPDK_BDEV_IO_TYPE_RESET:
1031 	case SPDK_BDEV_IO_TYPE_UNMAP:
1032 		return _raid_bdev_io_type_supported(ctx, io_type);
1033 
1034 	default:
1035 		return false;
1036 	}
1037 
1038 	return false;
1039 }
1040 
1041 /*
1042  * brief:
1043  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1044  * raid bdev. This is used to return the io channel for this raid bdev
1045  * params:
1046  * ctxt - pointer to raid_bdev
1047  * returns:
1048  * pointer to io channel for raid bdev
1049  */
1050 static struct spdk_io_channel *
1051 raid_bdev_get_io_channel(void *ctxt)
1052 {
1053 	struct raid_bdev *raid_bdev = ctxt;
1054 
1055 	return spdk_get_io_channel(raid_bdev);
1056 }
1057 
1058 void
1059 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1060 {
1061 	struct raid_base_bdev_info *base_info;
1062 
1063 	assert(raid_bdev != NULL);
1064 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1065 
1066 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1067 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1068 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1069 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1070 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1071 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1072 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1073 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1074 				     raid_bdev->num_base_bdevs_operational);
1075 	if (raid_bdev->process) {
1076 		struct raid_bdev_process *process = raid_bdev->process;
1077 		uint64_t offset = process->window_offset;
1078 
1079 		spdk_json_write_named_object_begin(w, "process");
1080 		spdk_json_write_name(w, "type");
1081 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1082 		spdk_json_write_named_string(w, "target", process->target->name);
1083 		spdk_json_write_named_object_begin(w, "progress");
1084 		spdk_json_write_named_uint64(w, "blocks", offset);
1085 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1086 		spdk_json_write_object_end(w);
1087 		spdk_json_write_object_end(w);
1088 	}
1089 	spdk_json_write_name(w, "base_bdevs_list");
1090 	spdk_json_write_array_begin(w);
1091 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1092 		spdk_json_write_object_begin(w);
1093 		spdk_json_write_name(w, "name");
1094 		if (base_info->name) {
1095 			spdk_json_write_string(w, base_info->name);
1096 		} else {
1097 			spdk_json_write_null(w);
1098 		}
1099 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1100 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1101 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1102 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1103 		spdk_json_write_object_end(w);
1104 	}
1105 	spdk_json_write_array_end(w);
1106 }
1107 
1108 /*
1109  * brief:
1110  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1111  * params:
1112  * ctx - pointer to raid_bdev
1113  * w - pointer to json context
1114  * returns:
1115  * 0 - success
1116  * non zero - failure
1117  */
1118 static int
1119 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1120 {
1121 	struct raid_bdev *raid_bdev = ctx;
1122 
1123 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1124 
1125 	/* Dump the raid bdev configuration related information */
1126 	spdk_json_write_named_object_begin(w, "raid");
1127 	raid_bdev_write_info_json(raid_bdev, w);
1128 	spdk_json_write_object_end(w);
1129 
1130 	return 0;
1131 }
1132 
1133 /*
1134  * brief:
1135  * raid_bdev_write_config_json is the function table pointer for raid bdev
1136  * params:
1137  * bdev - pointer to spdk_bdev
1138  * w - pointer to json context
1139  * returns:
1140  * none
1141  */
1142 static void
1143 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1144 {
1145 	struct raid_bdev *raid_bdev = bdev->ctxt;
1146 	struct raid_base_bdev_info *base_info;
1147 
1148 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1149 
1150 	if (raid_bdev->superblock_enabled) {
1151 		/* raid bdev configuration is stored in the superblock */
1152 		return;
1153 	}
1154 
1155 	spdk_json_write_object_begin(w);
1156 
1157 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1158 
1159 	spdk_json_write_named_object_begin(w, "params");
1160 	spdk_json_write_named_string(w, "name", bdev->name);
1161 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1162 	if (raid_bdev->strip_size_kb != 0) {
1163 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1164 	}
1165 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1166 
1167 	spdk_json_write_named_array_begin(w, "base_bdevs");
1168 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1169 		if (base_info->name) {
1170 			spdk_json_write_string(w, base_info->name);
1171 		} else {
1172 			char str[32];
1173 
1174 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1175 			spdk_json_write_string(w, str);
1176 		}
1177 	}
1178 	spdk_json_write_array_end(w);
1179 	spdk_json_write_object_end(w);
1180 
1181 	spdk_json_write_object_end(w);
1182 }
1183 
1184 static int
1185 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1186 {
1187 	struct raid_bdev *raid_bdev = ctx;
1188 	struct raid_base_bdev_info *base_info;
1189 	int domains_count = 0, rc = 0;
1190 
1191 	if (raid_bdev->module->memory_domains_supported == false) {
1192 		return 0;
1193 	}
1194 
1195 	/* First loop to get the number of memory domains */
1196 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1197 		if (base_info->is_configured == false) {
1198 			continue;
1199 		}
1200 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1201 		if (rc < 0) {
1202 			return rc;
1203 		}
1204 		domains_count += rc;
1205 	}
1206 
1207 	if (!domains || array_size < domains_count) {
1208 		return domains_count;
1209 	}
1210 
1211 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1212 		if (base_info->is_configured == false) {
1213 			continue;
1214 		}
1215 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1216 		if (rc < 0) {
1217 			return rc;
1218 		}
1219 		domains += rc;
1220 		array_size -= rc;
1221 	}
1222 
1223 	return domains_count;
1224 }
1225 
1226 /* g_raid_bdev_fn_table is the function table for raid bdev */
1227 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1228 	.destruct		= raid_bdev_destruct,
1229 	.submit_request		= raid_bdev_submit_request,
1230 	.io_type_supported	= raid_bdev_io_type_supported,
1231 	.get_io_channel		= raid_bdev_get_io_channel,
1232 	.dump_info_json		= raid_bdev_dump_info_json,
1233 	.write_config_json	= raid_bdev_write_config_json,
1234 	.get_memory_domains	= raid_bdev_get_memory_domains,
1235 };
1236 
1237 struct raid_bdev *
1238 raid_bdev_find_by_name(const char *name)
1239 {
1240 	struct raid_bdev *raid_bdev;
1241 
1242 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1243 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1244 			return raid_bdev;
1245 		}
1246 	}
1247 
1248 	return NULL;
1249 }
1250 
1251 static struct raid_bdev *
1252 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1253 {
1254 	struct raid_bdev *raid_bdev;
1255 
1256 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1257 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1258 			return raid_bdev;
1259 		}
1260 	}
1261 
1262 	return NULL;
1263 }
1264 
1265 static struct {
1266 	const char *name;
1267 	enum raid_level value;
1268 } g_raid_level_names[] = {
1269 	{ "raid0", RAID0 },
1270 	{ "0", RAID0 },
1271 	{ "raid1", RAID1 },
1272 	{ "1", RAID1 },
1273 	{ "raid5f", RAID5F },
1274 	{ "5f", RAID5F },
1275 	{ "concat", CONCAT },
1276 	{ }
1277 };
1278 
1279 const char *g_raid_state_names[] = {
1280 	[RAID_BDEV_STATE_ONLINE]	= "online",
1281 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1282 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1283 	[RAID_BDEV_STATE_MAX]		= NULL
1284 };
1285 
1286 static const char *g_raid_process_type_names[] = {
1287 	[RAID_PROCESS_NONE]	= "none",
1288 	[RAID_PROCESS_REBUILD]	= "rebuild",
1289 	[RAID_PROCESS_MAX]	= NULL
1290 };
1291 
1292 /* We have to use the typedef in the function declaration to appease astyle. */
1293 typedef enum raid_level raid_level_t;
1294 typedef enum raid_bdev_state raid_bdev_state_t;
1295 
1296 raid_level_t
1297 raid_bdev_str_to_level(const char *str)
1298 {
1299 	unsigned int i;
1300 
1301 	assert(str != NULL);
1302 
1303 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1304 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1305 			return g_raid_level_names[i].value;
1306 		}
1307 	}
1308 
1309 	return INVALID_RAID_LEVEL;
1310 }
1311 
1312 const char *
1313 raid_bdev_level_to_str(enum raid_level level)
1314 {
1315 	unsigned int i;
1316 
1317 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1318 		if (g_raid_level_names[i].value == level) {
1319 			return g_raid_level_names[i].name;
1320 		}
1321 	}
1322 
1323 	return "";
1324 }
1325 
1326 raid_bdev_state_t
1327 raid_bdev_str_to_state(const char *str)
1328 {
1329 	unsigned int i;
1330 
1331 	assert(str != NULL);
1332 
1333 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1334 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1335 			break;
1336 		}
1337 	}
1338 
1339 	return i;
1340 }
1341 
1342 const char *
1343 raid_bdev_state_to_str(enum raid_bdev_state state)
1344 {
1345 	if (state >= RAID_BDEV_STATE_MAX) {
1346 		return "";
1347 	}
1348 
1349 	return g_raid_state_names[state];
1350 }
1351 
1352 const char *
1353 raid_bdev_process_to_str(enum raid_process_type value)
1354 {
1355 	if (value >= RAID_PROCESS_MAX) {
1356 		return "";
1357 	}
1358 
1359 	return g_raid_process_type_names[value];
1360 }
1361 
1362 /*
1363  * brief:
1364  * raid_bdev_fini_start is called when bdev layer is starting the
1365  * shutdown process
1366  * params:
1367  * none
1368  * returns:
1369  * none
1370  */
1371 static void
1372 raid_bdev_fini_start(void)
1373 {
1374 	struct raid_bdev *raid_bdev;
1375 	struct raid_base_bdev_info *base_info;
1376 
1377 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1378 
1379 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1380 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1381 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1382 				raid_bdev_free_base_bdev_resource(base_info);
1383 			}
1384 		}
1385 	}
1386 
1387 	g_shutdown_started = true;
1388 }
1389 
1390 /*
1391  * brief:
1392  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1393  * params:
1394  * none
1395  * returns:
1396  * none
1397  */
1398 static void
1399 raid_bdev_exit(void)
1400 {
1401 	struct raid_bdev *raid_bdev, *tmp;
1402 
1403 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1404 
1405 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1406 		raid_bdev_cleanup_and_free(raid_bdev);
1407 	}
1408 }
1409 
1410 static void
1411 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1412 {
1413 	spdk_json_write_object_begin(w);
1414 
1415 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1416 
1417 	spdk_json_write_named_object_begin(w, "params");
1418 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1419 	spdk_json_write_named_uint32(w, "process_max_bandwidth_mb_sec",
1420 				     g_opts.process_max_bandwidth_mb_sec);
1421 	spdk_json_write_object_end(w);
1422 
1423 	spdk_json_write_object_end(w);
1424 }
1425 
1426 static int
1427 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1428 {
1429 	raid_bdev_opts_config_json(w);
1430 
1431 	return 0;
1432 }
1433 
1434 /*
1435  * brief:
1436  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1437  * module
1438  * params:
1439  * none
1440  * returns:
1441  * size of spdk_bdev_io context for raid
1442  */
1443 static int
1444 raid_bdev_get_ctx_size(void)
1445 {
1446 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1447 	return sizeof(struct raid_bdev_io);
1448 }
1449 
1450 static struct spdk_bdev_module g_raid_if = {
1451 	.name = "raid",
1452 	.module_init = raid_bdev_init,
1453 	.fini_start = raid_bdev_fini_start,
1454 	.module_fini = raid_bdev_exit,
1455 	.config_json = raid_bdev_config_json,
1456 	.get_ctx_size = raid_bdev_get_ctx_size,
1457 	.examine_disk = raid_bdev_examine,
1458 	.async_init = false,
1459 	.async_fini = false,
1460 };
1461 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1462 
1463 /*
1464  * brief:
1465  * raid_bdev_init is the initialization function for raid bdev module
1466  * params:
1467  * none
1468  * returns:
1469  * 0 - success
1470  * non zero - failure
1471  */
1472 static int
1473 raid_bdev_init(void)
1474 {
1475 	return 0;
1476 }
1477 
1478 static int
1479 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1480 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1481 		  struct raid_bdev **raid_bdev_out)
1482 {
1483 	struct raid_bdev *raid_bdev;
1484 	struct spdk_bdev *raid_bdev_gen;
1485 	struct raid_bdev_module *module;
1486 	struct raid_base_bdev_info *base_info;
1487 	uint8_t min_operational;
1488 
1489 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1490 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1491 		return -EINVAL;
1492 	}
1493 
1494 	if (raid_bdev_find_by_name(name) != NULL) {
1495 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1496 		return -EEXIST;
1497 	}
1498 
1499 	if (level == RAID1) {
1500 		if (strip_size != 0) {
1501 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1502 			return -EINVAL;
1503 		}
1504 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1505 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1506 		return -EINVAL;
1507 	}
1508 
1509 	module = raid_bdev_module_find(level);
1510 	if (module == NULL) {
1511 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1512 		return -EINVAL;
1513 	}
1514 
1515 	assert(module->base_bdevs_min != 0);
1516 	if (num_base_bdevs < module->base_bdevs_min) {
1517 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1518 			    module->base_bdevs_min,
1519 			    raid_bdev_level_to_str(level));
1520 		return -EINVAL;
1521 	}
1522 
1523 	switch (module->base_bdevs_constraint.type) {
1524 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1525 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1526 		break;
1527 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1528 		min_operational = module->base_bdevs_constraint.value;
1529 		break;
1530 	case CONSTRAINT_UNSET:
1531 		if (module->base_bdevs_constraint.value != 0) {
1532 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1533 				    (uint8_t)module->base_bdevs_constraint.value, name);
1534 			return -EINVAL;
1535 		}
1536 		min_operational = num_base_bdevs;
1537 		break;
1538 	default:
1539 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1540 			    (uint8_t)module->base_bdevs_constraint.type,
1541 			    raid_bdev_level_to_str(module->level));
1542 		return -EINVAL;
1543 	};
1544 
1545 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1546 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1547 			    raid_bdev_level_to_str(module->level));
1548 		return -EINVAL;
1549 	}
1550 
1551 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1552 	if (!raid_bdev) {
1553 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1554 		return -ENOMEM;
1555 	}
1556 
1557 	raid_bdev->module = module;
1558 	raid_bdev->num_base_bdevs = num_base_bdevs;
1559 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1560 					   sizeof(struct raid_base_bdev_info));
1561 	if (!raid_bdev->base_bdev_info) {
1562 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1563 		raid_bdev_free(raid_bdev);
1564 		return -ENOMEM;
1565 	}
1566 
1567 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1568 		base_info->raid_bdev = raid_bdev;
1569 	}
1570 
1571 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1572 	 * internally and set later.
1573 	 */
1574 	raid_bdev->strip_size = 0;
1575 	raid_bdev->strip_size_kb = strip_size;
1576 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1577 	raid_bdev->level = level;
1578 	raid_bdev->min_base_bdevs_operational = min_operational;
1579 	raid_bdev->superblock_enabled = superblock_enabled;
1580 
1581 	raid_bdev_gen = &raid_bdev->bdev;
1582 
1583 	raid_bdev_gen->name = strdup(name);
1584 	if (!raid_bdev_gen->name) {
1585 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1586 		raid_bdev_free(raid_bdev);
1587 		return -ENOMEM;
1588 	}
1589 
1590 	raid_bdev_gen->product_name = "Raid Volume";
1591 	raid_bdev_gen->ctxt = raid_bdev;
1592 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1593 	raid_bdev_gen->module = &g_raid_if;
1594 	raid_bdev_gen->write_cache = 0;
1595 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1596 
1597 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1598 
1599 	*raid_bdev_out = raid_bdev;
1600 
1601 	return 0;
1602 }
1603 
1604 /*
1605  * brief:
1606  * raid_bdev_create allocates raid bdev based on passed configuration
1607  * params:
1608  * name - name for raid bdev
1609  * strip_size - strip size in KB
1610  * num_base_bdevs - number of base bdevs
1611  * level - raid level
1612  * superblock_enabled - true if raid should have superblock
1613  * uuid - uuid to set for the bdev
1614  * raid_bdev_out - the created raid bdev
1615  * returns:
1616  * 0 - success
1617  * non zero - failure
1618  */
1619 int
1620 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1621 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1622 		 struct raid_bdev **raid_bdev_out)
1623 {
1624 	struct raid_bdev *raid_bdev;
1625 	int rc;
1626 
1627 	assert(uuid != NULL);
1628 
1629 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1630 			       &raid_bdev);
1631 	if (rc != 0) {
1632 		return rc;
1633 	}
1634 
1635 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1636 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1637 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1638 	}
1639 
1640 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1641 
1642 	*raid_bdev_out = raid_bdev;
1643 
1644 	return 0;
1645 }
1646 
1647 static void
1648 _raid_bdev_unregistering_cont(void *ctx)
1649 {
1650 	struct raid_bdev *raid_bdev = ctx;
1651 
1652 	spdk_bdev_close(raid_bdev->self_desc);
1653 	raid_bdev->self_desc = NULL;
1654 }
1655 
1656 static void
1657 raid_bdev_unregistering_cont(void *ctx)
1658 {
1659 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1660 }
1661 
1662 static int
1663 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1664 {
1665 	struct raid_process_finish_action *finish_action;
1666 
1667 	assert(spdk_get_thread() == process->thread);
1668 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1669 
1670 	finish_action = calloc(1, sizeof(*finish_action));
1671 	if (finish_action == NULL) {
1672 		return -ENOMEM;
1673 	}
1674 
1675 	finish_action->cb = cb;
1676 	finish_action->cb_ctx = cb_ctx;
1677 
1678 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1679 
1680 	return 0;
1681 }
1682 
1683 static void
1684 raid_bdev_unregistering_stop_process(void *ctx)
1685 {
1686 	struct raid_bdev_process *process = ctx;
1687 	struct raid_bdev *raid_bdev = process->raid_bdev;
1688 	int rc;
1689 
1690 	process->state = RAID_PROCESS_STATE_STOPPING;
1691 	if (process->status == 0) {
1692 		process->status = -ECANCELED;
1693 	}
1694 
1695 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1696 	if (rc != 0) {
1697 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1698 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1699 	}
1700 }
1701 
1702 static void
1703 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1704 {
1705 	struct raid_bdev *raid_bdev = event_ctx;
1706 
1707 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1708 		if (raid_bdev->process != NULL) {
1709 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1710 					     raid_bdev->process);
1711 		} else {
1712 			raid_bdev_unregistering_cont(raid_bdev);
1713 		}
1714 	}
1715 }
1716 
1717 static void
1718 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1719 {
1720 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1721 	int rc;
1722 
1723 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1724 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1725 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1726 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1727 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1728 				sizeof(struct raid_bdev_io_channel),
1729 				raid_bdev_gen->name);
1730 	rc = spdk_bdev_register(raid_bdev_gen);
1731 	if (rc != 0) {
1732 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1733 			    raid_bdev_gen->name, spdk_strerror(-rc));
1734 		goto out;
1735 	}
1736 
1737 	/*
1738 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1739 	 * first. The process may still need to unquiesce a range but it will fail because the
1740 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1741 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1742 	 * so this is the only way currently to do this correctly.
1743 	 * TODO: try to handle this correctly in bdev layer instead.
1744 	 */
1745 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1746 				&raid_bdev->self_desc);
1747 	if (rc != 0) {
1748 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1749 			    raid_bdev_gen->name, spdk_strerror(-rc));
1750 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1751 		goto out;
1752 	}
1753 
1754 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1755 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1756 		      raid_bdev_gen->name, raid_bdev);
1757 out:
1758 	if (rc != 0) {
1759 		if (raid_bdev->module->stop != NULL) {
1760 			raid_bdev->module->stop(raid_bdev);
1761 		}
1762 		spdk_io_device_unregister(raid_bdev, NULL);
1763 		raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1764 	}
1765 
1766 	if (raid_bdev->configure_cb != NULL) {
1767 		raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, rc);
1768 		raid_bdev->configure_cb = NULL;
1769 	}
1770 }
1771 
1772 static void
1773 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1774 {
1775 	if (status == 0) {
1776 		raid_bdev_configure_cont(raid_bdev);
1777 	} else {
1778 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1779 			    raid_bdev->bdev.name, spdk_strerror(-status));
1780 		if (raid_bdev->module->stop != NULL) {
1781 			raid_bdev->module->stop(raid_bdev);
1782 		}
1783 		if (raid_bdev->configure_cb != NULL) {
1784 			raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, status);
1785 			raid_bdev->configure_cb = NULL;
1786 		}
1787 	}
1788 }
1789 
1790 /*
1791  * brief:
1792  * If raid bdev config is complete, then only register the raid bdev to
1793  * bdev layer and remove this raid bdev from configuring list and
1794  * insert the raid bdev to configured list
1795  * params:
1796  * raid_bdev - pointer to raid bdev
1797  * returns:
1798  * 0 - success
1799  * non zero - failure
1800  */
1801 static int
1802 raid_bdev_configure(struct raid_bdev *raid_bdev, raid_bdev_configure_cb cb, void *cb_ctx)
1803 {
1804 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1805 	int rc;
1806 
1807 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1808 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1809 	assert(raid_bdev->bdev.blocklen > 0);
1810 
1811 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1812 	 * internal use.
1813 	 */
1814 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1815 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1816 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1817 		return -EINVAL;
1818 	}
1819 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1820 
1821 	rc = raid_bdev->module->start(raid_bdev);
1822 	if (rc != 0) {
1823 		SPDK_ERRLOG("raid module startup callback failed\n");
1824 		return rc;
1825 	}
1826 
1827 	assert(raid_bdev->configure_cb == NULL);
1828 	raid_bdev->configure_cb = cb;
1829 	raid_bdev->configure_cb_ctx = cb_ctx;
1830 
1831 	if (raid_bdev->superblock_enabled) {
1832 		if (raid_bdev->sb == NULL) {
1833 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1834 			if (rc == 0) {
1835 				raid_bdev_init_superblock(raid_bdev);
1836 			}
1837 		} else {
1838 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1839 			if (raid_bdev->sb->block_size != data_block_size) {
1840 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1841 				rc = -EINVAL;
1842 			}
1843 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1844 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1845 				rc = -EINVAL;
1846 			}
1847 		}
1848 
1849 		if (rc != 0) {
1850 			raid_bdev->configure_cb = NULL;
1851 			if (raid_bdev->module->stop != NULL) {
1852 				raid_bdev->module->stop(raid_bdev);
1853 			}
1854 			return rc;
1855 		}
1856 
1857 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1858 	} else {
1859 		raid_bdev_configure_cont(raid_bdev);
1860 	}
1861 
1862 	return 0;
1863 }
1864 
1865 /*
1866  * brief:
1867  * If raid bdev is online and registered, change the bdev state to
1868  * configuring and unregister this raid device. Queue this raid device
1869  * in configuring list
1870  * params:
1871  * raid_bdev - pointer to raid bdev
1872  * cb_fn - callback function
1873  * cb_arg - argument to callback function
1874  * returns:
1875  * none
1876  */
1877 static void
1878 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1879 		      void *cb_arg)
1880 {
1881 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1882 		if (cb_fn) {
1883 			cb_fn(cb_arg, 0);
1884 		}
1885 		return;
1886 	}
1887 
1888 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1889 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1890 
1891 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1892 }
1893 
1894 /*
1895  * brief:
1896  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1897  * params:
1898  * base_bdev - pointer to base bdev
1899  * returns:
1900  * base bdev info if found, otherwise NULL.
1901  */
1902 static struct raid_base_bdev_info *
1903 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1904 {
1905 	struct raid_bdev *raid_bdev;
1906 	struct raid_base_bdev_info *base_info;
1907 
1908 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1909 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1910 			if (base_info->desc != NULL &&
1911 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1912 				return base_info;
1913 			}
1914 		}
1915 	}
1916 
1917 	return NULL;
1918 }
1919 
1920 static void
1921 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1922 {
1923 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1924 
1925 	assert(base_info->remove_scheduled);
1926 	base_info->remove_scheduled = false;
1927 
1928 	if (status == 0) {
1929 		raid_bdev->num_base_bdevs_operational--;
1930 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1931 			/* There is not enough base bdevs to keep the raid bdev operational. */
1932 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1933 			return;
1934 		}
1935 	}
1936 
1937 	if (base_info->remove_cb != NULL) {
1938 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1939 	}
1940 }
1941 
1942 static void
1943 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1944 {
1945 	struct raid_base_bdev_info *base_info = ctx;
1946 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1947 
1948 	if (status != 0) {
1949 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1950 			    raid_bdev->bdev.name, spdk_strerror(-status));
1951 	}
1952 
1953 	raid_bdev_remove_base_bdev_done(base_info, status);
1954 }
1955 
1956 static void
1957 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1958 {
1959 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1960 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1961 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1962 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1963 
1964 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1965 
1966 	if (raid_ch->base_channel[idx] != NULL) {
1967 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1968 		raid_ch->base_channel[idx] = NULL;
1969 	}
1970 
1971 	if (raid_ch->process.ch_processed != NULL) {
1972 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1973 	}
1974 
1975 	spdk_for_each_channel_continue(i, 0);
1976 }
1977 
1978 static void
1979 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1980 {
1981 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1982 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1983 
1984 	raid_bdev_free_base_bdev_resource(base_info);
1985 
1986 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1987 			    base_info);
1988 }
1989 
1990 static void
1991 raid_bdev_remove_base_bdev_cont(struct raid_base_bdev_info *base_info)
1992 {
1993 	raid_bdev_deconfigure_base_bdev(base_info);
1994 
1995 	spdk_for_each_channel(base_info->raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1996 			      raid_bdev_channels_remove_base_bdev_done);
1997 }
1998 
1999 static void
2000 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2001 {
2002 	struct raid_base_bdev_info *base_info = ctx;
2003 
2004 	if (status != 0) {
2005 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
2006 			    raid_bdev->bdev.name, spdk_strerror(-status));
2007 		raid_bdev_remove_base_bdev_done(base_info, status);
2008 		return;
2009 	}
2010 
2011 	raid_bdev_remove_base_bdev_cont(base_info);
2012 }
2013 
2014 static void
2015 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
2016 {
2017 	struct raid_base_bdev_info *base_info = ctx;
2018 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2019 
2020 	if (status != 0) {
2021 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2022 			    raid_bdev->bdev.name, spdk_strerror(-status));
2023 		raid_bdev_remove_base_bdev_done(base_info, status);
2024 		return;
2025 	}
2026 
2027 	if (raid_bdev->sb) {
2028 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2029 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
2030 		uint8_t i;
2031 
2032 		for (i = 0; i < sb->base_bdevs_size; i++) {
2033 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2034 
2035 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
2036 			    sb_base_bdev->slot == slot) {
2037 				if (base_info->is_failed) {
2038 					sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
2039 				} else {
2040 					sb_base_bdev->state = RAID_SB_BASE_BDEV_MISSING;
2041 				}
2042 
2043 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
2044 				return;
2045 			}
2046 		}
2047 	}
2048 
2049 	raid_bdev_remove_base_bdev_cont(base_info);
2050 }
2051 
2052 static int
2053 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2054 {
2055 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2056 
2057 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2058 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2059 }
2060 
2061 struct raid_bdev_process_base_bdev_remove_ctx {
2062 	struct raid_bdev_process *process;
2063 	struct raid_base_bdev_info *base_info;
2064 	uint8_t num_base_bdevs_operational;
2065 };
2066 
2067 static void
2068 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2069 {
2070 	struct raid_base_bdev_info *base_info = ctx;
2071 	int ret;
2072 
2073 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2074 	if (ret != 0) {
2075 		raid_bdev_remove_base_bdev_done(base_info, ret);
2076 	}
2077 }
2078 
2079 static void
2080 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2081 {
2082 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2083 	struct raid_base_bdev_info *base_info = ctx->base_info;
2084 
2085 	free(ctx);
2086 
2087 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2088 			     base_info);
2089 }
2090 
2091 static void
2092 _raid_bdev_process_base_bdev_remove(void *_ctx)
2093 {
2094 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2095 	struct raid_bdev_process *process = ctx->process;
2096 	int ret;
2097 
2098 	if (ctx->base_info != process->target &&
2099 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2100 		/* process doesn't need to be stopped */
2101 		raid_bdev_process_base_bdev_remove_cont(ctx);
2102 		return;
2103 	}
2104 
2105 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2106 	       process->state < RAID_PROCESS_STATE_STOPPED);
2107 
2108 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2109 	if (ret != 0) {
2110 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2111 		free(ctx);
2112 		return;
2113 	}
2114 
2115 	process->state = RAID_PROCESS_STATE_STOPPING;
2116 
2117 	if (process->status == 0) {
2118 		process->status = -ENODEV;
2119 	}
2120 }
2121 
2122 static int
2123 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2124 				   struct raid_base_bdev_info *base_info)
2125 {
2126 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2127 
2128 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2129 
2130 	ctx = calloc(1, sizeof(*ctx));
2131 	if (ctx == NULL) {
2132 		return -ENOMEM;
2133 	}
2134 
2135 	/*
2136 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2137 	 * because the process thread should not access raid_bdev's properties. Particularly,
2138 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2139 	 * will still be valid until the process is fully stopped.
2140 	 */
2141 	ctx->base_info = base_info;
2142 	ctx->process = process;
2143 	/*
2144 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2145 	 * after the removal and more than one base bdev may be removed at the same time
2146 	 */
2147 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2148 		if (base_info->is_configured && !base_info->remove_scheduled) {
2149 			ctx->num_base_bdevs_operational++;
2150 		}
2151 	}
2152 
2153 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2154 
2155 	return 0;
2156 }
2157 
2158 static int
2159 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2160 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2161 {
2162 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2163 	int ret = 0;
2164 
2165 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2166 
2167 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2168 
2169 	if (base_info->remove_scheduled || !base_info->is_configured) {
2170 		return -ENODEV;
2171 	}
2172 
2173 	assert(base_info->desc);
2174 	base_info->remove_scheduled = true;
2175 
2176 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2177 		/*
2178 		 * As raid bdev is not registered yet or already unregistered,
2179 		 * so cleanup should be done here itself.
2180 		 *
2181 		 * Removing a base bdev at this stage does not change the number of operational
2182 		 * base bdevs, only the number of discovered base bdevs.
2183 		 */
2184 		raid_bdev_free_base_bdev_resource(base_info);
2185 		base_info->remove_scheduled = false;
2186 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2187 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2188 			/* There is no base bdev for this raid, so free the raid device. */
2189 			raid_bdev_cleanup_and_free(raid_bdev);
2190 		}
2191 		if (cb_fn != NULL) {
2192 			cb_fn(cb_ctx, 0);
2193 		}
2194 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2195 		/* This raid bdev does not tolerate removing a base bdev. */
2196 		raid_bdev->num_base_bdevs_operational--;
2197 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2198 	} else {
2199 		base_info->remove_cb = cb_fn;
2200 		base_info->remove_cb_ctx = cb_ctx;
2201 
2202 		if (raid_bdev->process != NULL) {
2203 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2204 		} else {
2205 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2206 		}
2207 
2208 		if (ret != 0) {
2209 			base_info->remove_scheduled = false;
2210 		}
2211 	}
2212 
2213 	return ret;
2214 }
2215 
2216 /*
2217  * brief:
2218  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2219  * is removed. This function checks if this base bdev is part of any raid bdev
2220  * or not. If yes, it takes necessary action on that particular raid bdev.
2221  * params:
2222  * base_bdev - pointer to base bdev which got removed
2223  * cb_fn - callback function
2224  * cb_arg - argument to callback function
2225  * returns:
2226  * 0 - success
2227  * non zero - failure
2228  */
2229 int
2230 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2231 {
2232 	struct raid_base_bdev_info *base_info;
2233 
2234 	/* Find the raid_bdev which has claimed this base_bdev */
2235 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2236 	if (!base_info) {
2237 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2238 		return -ENODEV;
2239 	}
2240 
2241 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2242 }
2243 
2244 static void
2245 raid_bdev_fail_base_remove_cb(void *ctx, int status)
2246 {
2247 	struct raid_base_bdev_info *base_info = ctx;
2248 
2249 	if (status != 0) {
2250 		SPDK_WARNLOG("Failed to remove base bdev %s\n", base_info->name);
2251 		base_info->is_failed = false;
2252 	}
2253 }
2254 
2255 static void
2256 _raid_bdev_fail_base_bdev(void *ctx)
2257 {
2258 	struct raid_base_bdev_info *base_info = ctx;
2259 	int rc;
2260 
2261 	if (base_info->is_failed) {
2262 		return;
2263 	}
2264 	base_info->is_failed = true;
2265 
2266 	SPDK_NOTICELOG("Failing base bdev in slot %d ('%s') of raid bdev '%s'\n",
2267 		       raid_bdev_base_bdev_slot(base_info), base_info->name, base_info->raid_bdev->bdev.name);
2268 
2269 	rc = _raid_bdev_remove_base_bdev(base_info, raid_bdev_fail_base_remove_cb, base_info);
2270 	if (rc != 0) {
2271 		raid_bdev_fail_base_remove_cb(base_info, rc);
2272 	}
2273 }
2274 
2275 void
2276 raid_bdev_fail_base_bdev(struct raid_base_bdev_info *base_info)
2277 {
2278 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_fail_base_bdev, base_info);
2279 }
2280 
2281 static void
2282 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2283 {
2284 	if (status != 0) {
2285 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2286 			    raid_bdev->bdev.name, spdk_strerror(-status));
2287 	}
2288 }
2289 
2290 /*
2291  * brief:
2292  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2293  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2294  * If yes, call module handler to resize the raid_bdev if implemented.
2295  * params:
2296  * base_bdev - pointer to base bdev which got resized.
2297  * returns:
2298  * none
2299  */
2300 static void
2301 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2302 {
2303 	struct raid_bdev *raid_bdev;
2304 	struct raid_base_bdev_info *base_info;
2305 	uint64_t blockcnt_old;
2306 
2307 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2308 
2309 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2310 
2311 	/* Find the raid_bdev which has claimed this base_bdev */
2312 	if (!base_info) {
2313 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2314 		return;
2315 	}
2316 	raid_bdev = base_info->raid_bdev;
2317 
2318 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2319 
2320 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2321 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2322 
2323 	base_info->blockcnt = base_bdev->blockcnt;
2324 
2325 	if (!raid_bdev->module->resize) {
2326 		return;
2327 	}
2328 
2329 	blockcnt_old = raid_bdev->bdev.blockcnt;
2330 	if (raid_bdev->module->resize(raid_bdev) == false) {
2331 		return;
2332 	}
2333 
2334 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2335 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2336 
2337 	if (raid_bdev->superblock_enabled) {
2338 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2339 		uint8_t i;
2340 
2341 		for (i = 0; i < sb->base_bdevs_size; i++) {
2342 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2343 
2344 			if (sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2345 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2346 				sb_base_bdev->data_size = base_info->data_size;
2347 			}
2348 		}
2349 		sb->raid_size = raid_bdev->bdev.blockcnt;
2350 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2351 	}
2352 }
2353 
2354 /*
2355  * brief:
2356  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2357  * triggers asynchronous event.
2358  * params:
2359  * type - event details.
2360  * bdev - bdev that triggered event.
2361  * event_ctx - context for event.
2362  * returns:
2363  * none
2364  */
2365 static void
2366 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2367 			  void *event_ctx)
2368 {
2369 	int rc;
2370 
2371 	switch (type) {
2372 	case SPDK_BDEV_EVENT_REMOVE:
2373 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2374 		if (rc != 0) {
2375 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2376 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2377 		}
2378 		break;
2379 	case SPDK_BDEV_EVENT_RESIZE:
2380 		raid_bdev_resize_base_bdev(bdev);
2381 		break;
2382 	default:
2383 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2384 		break;
2385 	}
2386 }
2387 
2388 /*
2389  * brief:
2390  * Deletes the specified raid bdev
2391  * params:
2392  * raid_bdev - pointer to raid bdev
2393  * cb_fn - callback function
2394  * cb_arg - argument to callback function
2395  */
2396 void
2397 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2398 {
2399 	struct raid_base_bdev_info *base_info;
2400 
2401 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2402 
2403 	if (raid_bdev->destroy_started) {
2404 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2405 			      raid_bdev->bdev.name);
2406 		if (cb_fn) {
2407 			cb_fn(cb_arg, -EALREADY);
2408 		}
2409 		return;
2410 	}
2411 
2412 	raid_bdev->destroy_started = true;
2413 
2414 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2415 		base_info->remove_scheduled = true;
2416 
2417 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2418 			/*
2419 			 * As raid bdev is not registered yet or already unregistered,
2420 			 * so cleanup should be done here itself.
2421 			 */
2422 			raid_bdev_free_base_bdev_resource(base_info);
2423 		}
2424 	}
2425 
2426 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2427 		/* There is no base bdev for this raid, so free the raid device. */
2428 		raid_bdev_cleanup_and_free(raid_bdev);
2429 		if (cb_fn) {
2430 			cb_fn(cb_arg, 0);
2431 		}
2432 	} else {
2433 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2434 	}
2435 }
2436 
2437 static void
2438 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2439 {
2440 	if (status != 0) {
2441 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2442 			    raid_bdev->bdev.name, spdk_strerror(-status));
2443 	}
2444 }
2445 
2446 static void
2447 raid_bdev_process_finish_write_sb(void *ctx)
2448 {
2449 	struct raid_bdev *raid_bdev = ctx;
2450 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2451 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2452 	struct raid_base_bdev_info *base_info;
2453 	uint8_t i;
2454 
2455 	for (i = 0; i < sb->base_bdevs_size; i++) {
2456 		sb_base_bdev = &sb->base_bdevs[i];
2457 
2458 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2459 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2460 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2461 			if (base_info->is_configured) {
2462 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2463 				sb_base_bdev->data_offset = base_info->data_offset;
2464 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2465 			}
2466 		}
2467 	}
2468 
2469 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2470 }
2471 
2472 static void raid_bdev_process_free(struct raid_bdev_process *process);
2473 
2474 static void
2475 _raid_bdev_process_finish_done(void *ctx)
2476 {
2477 	struct raid_bdev_process *process = ctx;
2478 	struct raid_process_finish_action *finish_action;
2479 
2480 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2481 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2482 		finish_action->cb(finish_action->cb_ctx);
2483 		free(finish_action);
2484 	}
2485 
2486 	spdk_poller_unregister(&process->qos.process_continue_poller);
2487 
2488 	raid_bdev_process_free(process);
2489 
2490 	spdk_thread_exit(spdk_get_thread());
2491 }
2492 
2493 static void
2494 raid_bdev_process_finish_target_removed(void *ctx, int status)
2495 {
2496 	struct raid_bdev_process *process = ctx;
2497 
2498 	if (status != 0) {
2499 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2500 	}
2501 
2502 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2503 }
2504 
2505 static void
2506 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2507 {
2508 	struct raid_bdev_process *process = ctx;
2509 
2510 	if (status != 0) {
2511 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2512 	}
2513 
2514 	if (process->status != 0) {
2515 		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2516 						     process);
2517 		if (status != 0) {
2518 			raid_bdev_process_finish_target_removed(process, status);
2519 		}
2520 		return;
2521 	}
2522 
2523 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2524 }
2525 
2526 static void
2527 raid_bdev_process_finish_unquiesce(void *ctx)
2528 {
2529 	struct raid_bdev_process *process = ctx;
2530 	int rc;
2531 
2532 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2533 				 raid_bdev_process_finish_unquiesced, process);
2534 	if (rc != 0) {
2535 		raid_bdev_process_finish_unquiesced(process, rc);
2536 	}
2537 }
2538 
2539 static void
2540 raid_bdev_process_finish_done(void *ctx)
2541 {
2542 	struct raid_bdev_process *process = ctx;
2543 	struct raid_bdev *raid_bdev = process->raid_bdev;
2544 
2545 	if (process->raid_ch != NULL) {
2546 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2547 	}
2548 
2549 	process->state = RAID_PROCESS_STATE_STOPPED;
2550 
2551 	if (process->status == 0) {
2552 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2553 			       raid_bdev_process_to_str(process->type),
2554 			       raid_bdev->bdev.name);
2555 		if (raid_bdev->superblock_enabled) {
2556 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2557 					     raid_bdev_process_finish_write_sb,
2558 					     raid_bdev);
2559 		}
2560 	} else {
2561 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2562 			     raid_bdev_process_to_str(process->type),
2563 			     raid_bdev->bdev.name,
2564 			     spdk_strerror(-process->status));
2565 	}
2566 
2567 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2568 			     process);
2569 }
2570 
2571 static void
2572 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2573 {
2574 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2575 
2576 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2577 }
2578 
2579 static void
2580 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2581 {
2582 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2583 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2584 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2585 
2586 	if (process->status == 0) {
2587 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2588 
2589 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2590 		raid_ch->process.target_ch = NULL;
2591 	}
2592 
2593 	raid_bdev_ch_process_cleanup(raid_ch);
2594 
2595 	spdk_for_each_channel_continue(i, 0);
2596 }
2597 
2598 static void
2599 raid_bdev_process_finish_quiesced(void *ctx, int status)
2600 {
2601 	struct raid_bdev_process *process = ctx;
2602 	struct raid_bdev *raid_bdev = process->raid_bdev;
2603 
2604 	if (status != 0) {
2605 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2606 		return;
2607 	}
2608 
2609 	raid_bdev->process = NULL;
2610 	process->target->is_process_target = false;
2611 
2612 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2613 			      __raid_bdev_process_finish);
2614 }
2615 
2616 static void
2617 _raid_bdev_process_finish(void *ctx)
2618 {
2619 	struct raid_bdev_process *process = ctx;
2620 	int rc;
2621 
2622 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2623 			       raid_bdev_process_finish_quiesced, process);
2624 	if (rc != 0) {
2625 		raid_bdev_process_finish_quiesced(ctx, rc);
2626 	}
2627 }
2628 
2629 static void
2630 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2631 {
2632 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2633 }
2634 
2635 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2636 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2637 
2638 static void
2639 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2640 {
2641 	assert(spdk_get_thread() == process->thread);
2642 
2643 	if (process->status == 0) {
2644 		process->status = status;
2645 	}
2646 
2647 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2648 		return;
2649 	}
2650 
2651 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2652 	process->state = RAID_PROCESS_STATE_STOPPING;
2653 
2654 	if (process->window_range_locked) {
2655 		raid_bdev_process_unlock_window_range(process);
2656 	} else {
2657 		raid_bdev_process_thread_run(process);
2658 	}
2659 }
2660 
2661 static void
2662 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2663 {
2664 	struct raid_bdev_process *process = ctx;
2665 
2666 	if (status != 0) {
2667 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2668 		raid_bdev_process_finish(process, status);
2669 		return;
2670 	}
2671 
2672 	process->window_range_locked = false;
2673 	process->window_offset += process->window_size;
2674 
2675 	raid_bdev_process_thread_run(process);
2676 }
2677 
2678 static void
2679 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2680 {
2681 	int rc;
2682 
2683 	assert(process->window_range_locked == true);
2684 
2685 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2686 				       process->window_offset, process->max_window_size,
2687 				       raid_bdev_process_window_range_unlocked, process);
2688 	if (rc != 0) {
2689 		raid_bdev_process_window_range_unlocked(process, rc);
2690 	}
2691 }
2692 
2693 static void
2694 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2695 {
2696 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2697 
2698 	raid_bdev_process_unlock_window_range(process);
2699 }
2700 
2701 static void
2702 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2703 {
2704 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2705 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2706 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2707 
2708 	raid_ch->process.offset = process->window_offset + process->window_size;
2709 
2710 	spdk_for_each_channel_continue(i, 0);
2711 }
2712 
2713 void
2714 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2715 {
2716 	struct raid_bdev_process *process = process_req->process;
2717 
2718 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2719 
2720 	assert(spdk_get_thread() == process->thread);
2721 	assert(process->window_remaining >= process_req->num_blocks);
2722 
2723 	if (status != 0) {
2724 		process->window_status = status;
2725 	}
2726 
2727 	process->window_remaining -= process_req->num_blocks;
2728 	if (process->window_remaining == 0) {
2729 		if (process->window_status != 0) {
2730 			raid_bdev_process_finish(process, process->window_status);
2731 			return;
2732 		}
2733 
2734 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2735 				      raid_bdev_process_channels_update_done);
2736 	}
2737 }
2738 
2739 static int
2740 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2741 				 uint32_t num_blocks)
2742 {
2743 	struct raid_bdev *raid_bdev = process->raid_bdev;
2744 	struct raid_bdev_process_request *process_req;
2745 	int ret;
2746 
2747 	process_req = TAILQ_FIRST(&process->requests);
2748 	if (process_req == NULL) {
2749 		assert(process->window_remaining > 0);
2750 		return 0;
2751 	}
2752 
2753 	process_req->target = process->target;
2754 	process_req->target_ch = process->raid_ch->process.target_ch;
2755 	process_req->offset_blocks = offset_blocks;
2756 	process_req->num_blocks = num_blocks;
2757 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2758 
2759 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2760 	if (ret <= 0) {
2761 		if (ret < 0) {
2762 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2763 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2764 			process->window_status = ret;
2765 		}
2766 		return ret;
2767 	}
2768 
2769 	process_req->num_blocks = ret;
2770 	TAILQ_REMOVE(&process->requests, process_req, link);
2771 
2772 	return ret;
2773 }
2774 
2775 static void
2776 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2777 {
2778 	struct raid_bdev *raid_bdev = process->raid_bdev;
2779 	uint64_t offset = process->window_offset;
2780 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2781 	int ret;
2782 
2783 	while (offset < offset_end) {
2784 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2785 		if (ret <= 0) {
2786 			break;
2787 		}
2788 
2789 		process->window_remaining += ret;
2790 		offset += ret;
2791 	}
2792 
2793 	if (process->window_remaining > 0) {
2794 		process->window_size = process->window_remaining;
2795 	} else {
2796 		raid_bdev_process_finish(process, process->window_status);
2797 	}
2798 }
2799 
2800 static void
2801 raid_bdev_process_window_range_locked(void *ctx, int status)
2802 {
2803 	struct raid_bdev_process *process = ctx;
2804 
2805 	if (status != 0) {
2806 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2807 		raid_bdev_process_finish(process, status);
2808 		return;
2809 	}
2810 
2811 	process->window_range_locked = true;
2812 
2813 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2814 		raid_bdev_process_unlock_window_range(process);
2815 		return;
2816 	}
2817 
2818 	_raid_bdev_process_thread_run(process);
2819 }
2820 
2821 static bool
2822 raid_bdev_process_consume_token(struct raid_bdev_process *process)
2823 {
2824 	struct raid_bdev *raid_bdev = process->raid_bdev;
2825 	uint64_t now = spdk_get_ticks();
2826 
2827 	process->qos.bytes_available = spdk_min(process->qos.bytes_max,
2828 						process->qos.bytes_available +
2829 						(now - process->qos.last_tsc) * process->qos.bytes_per_tsc);
2830 	process->qos.last_tsc = now;
2831 	if (process->qos.bytes_available > 0.0) {
2832 		process->qos.bytes_available -= process->window_size * raid_bdev->bdev.blocklen;
2833 		return true;
2834 	}
2835 	return false;
2836 }
2837 
2838 static bool
2839 raid_bdev_process_lock_window_range(struct raid_bdev_process *process)
2840 {
2841 	struct raid_bdev *raid_bdev = process->raid_bdev;
2842 	int rc;
2843 
2844 	assert(process->window_range_locked == false);
2845 
2846 	if (process->qos.enable_qos) {
2847 		if (raid_bdev_process_consume_token(process)) {
2848 			spdk_poller_pause(process->qos.process_continue_poller);
2849 		} else {
2850 			spdk_poller_resume(process->qos.process_continue_poller);
2851 			return false;
2852 		}
2853 	}
2854 
2855 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2856 				     process->window_offset, process->max_window_size,
2857 				     raid_bdev_process_window_range_locked, process);
2858 	if (rc != 0) {
2859 		raid_bdev_process_window_range_locked(process, rc);
2860 	}
2861 	return true;
2862 }
2863 
2864 static int
2865 raid_bdev_process_continue_poll(void *arg)
2866 {
2867 	struct raid_bdev_process *process = arg;
2868 
2869 	if (raid_bdev_process_lock_window_range(process)) {
2870 		return SPDK_POLLER_BUSY;
2871 	}
2872 	return SPDK_POLLER_IDLE;
2873 }
2874 
2875 static void
2876 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2877 {
2878 	struct raid_bdev *raid_bdev = process->raid_bdev;
2879 
2880 	assert(spdk_get_thread() == process->thread);
2881 	assert(process->window_remaining == 0);
2882 	assert(process->window_range_locked == false);
2883 
2884 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2885 		raid_bdev_process_do_finish(process);
2886 		return;
2887 	}
2888 
2889 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2890 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2891 		raid_bdev_process_finish(process, 0);
2892 		return;
2893 	}
2894 
2895 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2896 					    process->max_window_size);
2897 	raid_bdev_process_lock_window_range(process);
2898 }
2899 
2900 static void
2901 raid_bdev_process_thread_init(void *ctx)
2902 {
2903 	struct raid_bdev_process *process = ctx;
2904 	struct raid_bdev *raid_bdev = process->raid_bdev;
2905 	struct spdk_io_channel *ch;
2906 
2907 	process->thread = spdk_get_thread();
2908 
2909 	ch = spdk_get_io_channel(raid_bdev);
2910 	if (ch == NULL) {
2911 		process->status = -ENOMEM;
2912 		raid_bdev_process_do_finish(process);
2913 		return;
2914 	}
2915 
2916 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2917 	process->state = RAID_PROCESS_STATE_RUNNING;
2918 
2919 	if (process->qos.enable_qos) {
2920 		process->qos.process_continue_poller = SPDK_POLLER_REGISTER(raid_bdev_process_continue_poll,
2921 						       process, 0);
2922 		spdk_poller_pause(process->qos.process_continue_poller);
2923 	}
2924 
2925 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2926 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2927 
2928 	raid_bdev_process_thread_run(process);
2929 }
2930 
2931 static void
2932 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2933 {
2934 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2935 
2936 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2937 	raid_bdev_process_free(process);
2938 
2939 	/* TODO: update sb */
2940 }
2941 
2942 static void
2943 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2944 {
2945 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2946 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2947 
2948 	raid_bdev_ch_process_cleanup(raid_ch);
2949 
2950 	spdk_for_each_channel_continue(i, 0);
2951 }
2952 
2953 static void
2954 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2955 {
2956 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2957 	struct raid_bdev *raid_bdev = process->raid_bdev;
2958 	struct spdk_thread *thread;
2959 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2960 
2961 	if (status == 0 &&
2962 	    (process->target->remove_scheduled || !process->target->is_configured ||
2963 	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2964 		/* a base bdev was removed before we got here */
2965 		status = -ENODEV;
2966 	}
2967 
2968 	if (status != 0) {
2969 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2970 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2971 			    spdk_strerror(-status));
2972 		goto err;
2973 	}
2974 
2975 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2976 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2977 
2978 	thread = spdk_thread_create(thread_name, NULL);
2979 	if (thread == NULL) {
2980 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2981 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2982 		goto err;
2983 	}
2984 
2985 	raid_bdev->process = process;
2986 
2987 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2988 
2989 	return;
2990 err:
2991 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2992 			      raid_bdev_channels_abort_start_process_done);
2993 }
2994 
2995 static void
2996 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2997 {
2998 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2999 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3000 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
3001 	int rc;
3002 
3003 	rc = raid_bdev_ch_process_setup(raid_ch, process);
3004 
3005 	spdk_for_each_channel_continue(i, rc);
3006 }
3007 
3008 static void
3009 raid_bdev_process_start(struct raid_bdev_process *process)
3010 {
3011 	struct raid_bdev *raid_bdev = process->raid_bdev;
3012 
3013 	assert(raid_bdev->module->submit_process_request != NULL);
3014 
3015 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
3016 			      raid_bdev_channels_start_process_done);
3017 }
3018 
3019 static void
3020 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
3021 {
3022 	spdk_dma_free(process_req->iov.iov_base);
3023 	spdk_dma_free(process_req->md_buf);
3024 	free(process_req);
3025 }
3026 
3027 static struct raid_bdev_process_request *
3028 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
3029 {
3030 	struct raid_bdev *raid_bdev = process->raid_bdev;
3031 	struct raid_bdev_process_request *process_req;
3032 
3033 	process_req = calloc(1, sizeof(*process_req));
3034 	if (process_req == NULL) {
3035 		return NULL;
3036 	}
3037 
3038 	process_req->process = process;
3039 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
3040 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
3041 	if (process_req->iov.iov_base == NULL) {
3042 		free(process_req);
3043 		return NULL;
3044 	}
3045 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
3046 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
3047 		if (process_req->md_buf == NULL) {
3048 			raid_bdev_process_request_free(process_req);
3049 			return NULL;
3050 		}
3051 	}
3052 
3053 	return process_req;
3054 }
3055 
3056 static void
3057 raid_bdev_process_free(struct raid_bdev_process *process)
3058 {
3059 	struct raid_bdev_process_request *process_req;
3060 
3061 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
3062 		TAILQ_REMOVE(&process->requests, process_req, link);
3063 		raid_bdev_process_request_free(process_req);
3064 	}
3065 
3066 	free(process);
3067 }
3068 
3069 static struct raid_bdev_process *
3070 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
3071 			struct raid_base_bdev_info *target)
3072 {
3073 	struct raid_bdev_process *process;
3074 	struct raid_bdev_process_request *process_req;
3075 	int i;
3076 
3077 	process = calloc(1, sizeof(*process));
3078 	if (process == NULL) {
3079 		return NULL;
3080 	}
3081 
3082 	process->raid_bdev = raid_bdev;
3083 	process->type = type;
3084 	process->target = target;
3085 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
3086 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
3087 					    raid_bdev->bdev.write_unit_size);
3088 	TAILQ_INIT(&process->requests);
3089 	TAILQ_INIT(&process->finish_actions);
3090 
3091 	if (g_opts.process_max_bandwidth_mb_sec != 0) {
3092 		process->qos.enable_qos = true;
3093 		process->qos.last_tsc = spdk_get_ticks();
3094 		process->qos.bytes_per_tsc = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 /
3095 					     spdk_get_ticks_hz();
3096 		process->qos.bytes_max = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 / SPDK_SEC_TO_MSEC;
3097 		process->qos.bytes_available = 0.0;
3098 	}
3099 
3100 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
3101 		process_req = raid_bdev_process_alloc_request(process);
3102 		if (process_req == NULL) {
3103 			raid_bdev_process_free(process);
3104 			return NULL;
3105 		}
3106 
3107 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
3108 	}
3109 
3110 	return process;
3111 }
3112 
3113 static int
3114 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
3115 {
3116 	struct raid_bdev_process *process;
3117 
3118 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3119 
3120 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
3121 	if (process == NULL) {
3122 		return -ENOMEM;
3123 	}
3124 
3125 	raid_bdev_process_start(process);
3126 
3127 	return 0;
3128 }
3129 
3130 static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
3131 
3132 static void
3133 _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
3134 {
3135 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
3136 
3137 	raid_bdev_configure_base_bdev_cont(base_info);
3138 }
3139 
3140 static void
3141 raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3142 {
3143 	spdk_for_each_channel_continue(i, 0);
3144 }
3145 
3146 static void
3147 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3148 {
3149 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3150 	raid_base_bdev_cb configure_cb;
3151 	int rc;
3152 
3153 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3154 	    base_info->is_process_target == false) {
3155 		/* TODO: defer if rebuild in progress on another base bdev */
3156 		assert(raid_bdev->process == NULL);
3157 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3158 		base_info->is_process_target = true;
3159 		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3160 		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3161 		return;
3162 	}
3163 
3164 	base_info->is_configured = true;
3165 
3166 	raid_bdev->num_base_bdevs_discovered++;
3167 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3168 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3169 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3170 
3171 	configure_cb = base_info->configure_cb;
3172 	base_info->configure_cb = NULL;
3173 	/*
3174 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3175 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3176 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3177 	 * degraded.
3178 	 */
3179 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3180 		rc = raid_bdev_configure(raid_bdev, configure_cb, base_info->configure_cb_ctx);
3181 		if (rc != 0) {
3182 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3183 		} else {
3184 			configure_cb = NULL;
3185 		}
3186 	} else if (base_info->is_process_target) {
3187 		raid_bdev->num_base_bdevs_operational++;
3188 		rc = raid_bdev_start_rebuild(base_info);
3189 		if (rc != 0) {
3190 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3191 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3192 		}
3193 	} else {
3194 		rc = 0;
3195 	}
3196 
3197 	if (configure_cb != NULL) {
3198 		configure_cb(base_info->configure_cb_ctx, rc);
3199 	}
3200 }
3201 
3202 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3203 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3204 
3205 static void
3206 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3207 		void *ctx)
3208 {
3209 	struct raid_base_bdev_info *base_info = ctx;
3210 	raid_base_bdev_cb configure_cb = base_info->configure_cb;
3211 
3212 	switch (status) {
3213 	case 0:
3214 		/* valid superblock found */
3215 		base_info->configure_cb = NULL;
3216 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3217 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3218 
3219 			raid_bdev_free_base_bdev_resource(base_info);
3220 			raid_bdev_examine_sb(sb, bdev, configure_cb, base_info->configure_cb_ctx);
3221 			return;
3222 		}
3223 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3224 		status = -EEXIST;
3225 		raid_bdev_free_base_bdev_resource(base_info);
3226 		break;
3227 	case -EINVAL:
3228 		/* no valid superblock */
3229 		raid_bdev_configure_base_bdev_cont(base_info);
3230 		return;
3231 	default:
3232 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3233 			    base_info->name, spdk_strerror(-status));
3234 		break;
3235 	}
3236 
3237 	if (configure_cb != NULL) {
3238 		base_info->configure_cb = NULL;
3239 		configure_cb(base_info->configure_cb_ctx, status);
3240 	}
3241 }
3242 
3243 static int
3244 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3245 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3246 {
3247 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3248 	struct spdk_bdev_desc *desc;
3249 	struct spdk_bdev *bdev;
3250 	const struct spdk_uuid *bdev_uuid;
3251 	int rc;
3252 
3253 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3254 	assert(base_info->desc == NULL);
3255 
3256 	/*
3257 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3258 	 * before claiming the bdev.
3259 	 */
3260 
3261 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3262 		char uuid_str[SPDK_UUID_STRING_LEN];
3263 		const char *bdev_name;
3264 
3265 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3266 
3267 		/* UUID of a bdev is registered as its alias */
3268 		bdev = spdk_bdev_get_by_name(uuid_str);
3269 		if (bdev == NULL) {
3270 			return -ENODEV;
3271 		}
3272 
3273 		bdev_name = spdk_bdev_get_name(bdev);
3274 
3275 		if (base_info->name == NULL) {
3276 			assert(existing == true);
3277 			base_info->name = strdup(bdev_name);
3278 			if (base_info->name == NULL) {
3279 				return -ENOMEM;
3280 			}
3281 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3282 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3283 				    bdev_name, base_info->name);
3284 			return -EINVAL;
3285 		}
3286 	}
3287 
3288 	assert(base_info->name != NULL);
3289 
3290 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3291 	if (rc != 0) {
3292 		if (rc != -ENODEV) {
3293 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3294 		}
3295 		return rc;
3296 	}
3297 
3298 	bdev = spdk_bdev_desc_get_bdev(desc);
3299 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3300 
3301 	if (spdk_uuid_is_null(&base_info->uuid)) {
3302 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3303 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3304 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3305 		spdk_bdev_close(desc);
3306 		return -EINVAL;
3307 	}
3308 
3309 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3310 	if (rc != 0) {
3311 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3312 		spdk_bdev_close(desc);
3313 		return rc;
3314 	}
3315 
3316 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3317 
3318 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3319 	if (base_info->app_thread_ch == NULL) {
3320 		SPDK_ERRLOG("Failed to get io channel\n");
3321 		spdk_bdev_module_release_bdev(bdev);
3322 		spdk_bdev_close(desc);
3323 		return -ENOMEM;
3324 	}
3325 
3326 	base_info->desc = desc;
3327 	base_info->blockcnt = bdev->blockcnt;
3328 
3329 	if (raid_bdev->superblock_enabled) {
3330 		uint64_t data_offset;
3331 
3332 		if (base_info->data_offset == 0) {
3333 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3334 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3335 		} else {
3336 			data_offset = base_info->data_offset;
3337 		}
3338 
3339 		if (bdev->optimal_io_boundary != 0) {
3340 			data_offset = spdk_divide_round_up(data_offset,
3341 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3342 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3343 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3344 					     base_info->data_offset, base_info->name, data_offset);
3345 				data_offset = base_info->data_offset;
3346 			}
3347 		}
3348 
3349 		base_info->data_offset = data_offset;
3350 	}
3351 
3352 	if (base_info->data_offset >= bdev->blockcnt) {
3353 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3354 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3355 		rc = -EINVAL;
3356 		goto out;
3357 	}
3358 
3359 	if (base_info->data_size == 0) {
3360 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3361 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3362 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3363 			    bdev->blockcnt, base_info->name);
3364 		rc = -EINVAL;
3365 		goto out;
3366 	}
3367 
3368 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3369 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3370 			    bdev->name);
3371 		rc = -EINVAL;
3372 		goto out;
3373 	}
3374 
3375 	/*
3376 	 * Set the raid bdev properties if this is the first base bdev configured,
3377 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3378 	 * have the same blocklen and metadata format.
3379 	 */
3380 	if (raid_bdev->bdev.blocklen == 0) {
3381 		raid_bdev->bdev.blocklen = bdev->blocklen;
3382 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3383 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3384 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3385 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3386 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3387 		raid_bdev->bdev.dif_pi_format = bdev->dif_pi_format;
3388 	} else {
3389 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3390 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3391 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3392 			rc = -EINVAL;
3393 			goto out;
3394 		}
3395 
3396 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3397 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3398 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3399 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3400 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev) ||
3401 		    raid_bdev->bdev.dif_pi_format != bdev->dif_pi_format) {
3402 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3403 				    raid_bdev->bdev.name, bdev->name);
3404 			rc = -EINVAL;
3405 			goto out;
3406 		}
3407 	}
3408 
3409 	assert(base_info->configure_cb == NULL);
3410 	base_info->configure_cb = cb_fn;
3411 	base_info->configure_cb_ctx = cb_ctx;
3412 
3413 	if (existing) {
3414 		raid_bdev_configure_base_bdev_cont(base_info);
3415 	} else {
3416 		/* check for existing superblock when using a new bdev */
3417 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3418 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3419 		if (rc) {
3420 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3421 				    bdev->name, spdk_strerror(-rc));
3422 		}
3423 	}
3424 out:
3425 	if (rc != 0) {
3426 		base_info->configure_cb = NULL;
3427 		raid_bdev_free_base_bdev_resource(base_info);
3428 	}
3429 	return rc;
3430 }
3431 
3432 int
3433 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3434 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3435 {
3436 	struct raid_base_bdev_info *base_info = NULL, *iter;
3437 	int rc;
3438 
3439 	assert(name != NULL);
3440 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3441 
3442 	if (raid_bdev->process != NULL) {
3443 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3444 			    raid_bdev->bdev.name);
3445 		return -EPERM;
3446 	}
3447 
3448 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3449 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3450 
3451 		if (bdev != NULL) {
3452 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3453 				if (iter->name == NULL &&
3454 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3455 					base_info = iter;
3456 					break;
3457 				}
3458 			}
3459 		}
3460 	}
3461 
3462 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3463 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3464 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3465 				base_info = iter;
3466 				break;
3467 			}
3468 		}
3469 	}
3470 
3471 	if (base_info == NULL) {
3472 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3473 			    raid_bdev->bdev.name, name);
3474 		return -EINVAL;
3475 	}
3476 
3477 	assert(base_info->is_configured == false);
3478 
3479 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3480 		assert(base_info->data_size != 0);
3481 		assert(base_info->desc == NULL);
3482 	}
3483 
3484 	base_info->name = strdup(name);
3485 	if (base_info->name == NULL) {
3486 		return -ENOMEM;
3487 	}
3488 
3489 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3490 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3491 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3492 		free(base_info->name);
3493 		base_info->name = NULL;
3494 	}
3495 
3496 	return rc;
3497 }
3498 
3499 static int
3500 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3501 {
3502 	struct raid_bdev *raid_bdev;
3503 	uint8_t i;
3504 	int rc;
3505 
3506 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3507 			       sb->level, true, &sb->uuid, &raid_bdev);
3508 	if (rc != 0) {
3509 		return rc;
3510 	}
3511 
3512 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3513 	if (rc != 0) {
3514 		raid_bdev_free(raid_bdev);
3515 		return rc;
3516 	}
3517 
3518 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3519 	memcpy(raid_bdev->sb, sb, sb->length);
3520 
3521 	for (i = 0; i < sb->base_bdevs_size; i++) {
3522 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3523 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3524 
3525 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3526 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3527 			raid_bdev->num_base_bdevs_operational++;
3528 		}
3529 
3530 		base_info->data_offset = sb_base_bdev->data_offset;
3531 		base_info->data_size = sb_base_bdev->data_size;
3532 	}
3533 
3534 	*raid_bdev_out = raid_bdev;
3535 	return 0;
3536 }
3537 
3538 static void
3539 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3540 {
3541 	struct raid_bdev *raid_bdev;
3542 	struct raid_base_bdev_info *base_info;
3543 
3544 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3545 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3546 			continue;
3547 		}
3548 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3549 			if (base_info->desc == NULL &&
3550 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3551 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3552 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3553 				break;
3554 			}
3555 		}
3556 	}
3557 }
3558 
3559 struct raid_bdev_examine_others_ctx {
3560 	struct spdk_uuid raid_bdev_uuid;
3561 	uint8_t current_base_bdev_idx;
3562 	raid_base_bdev_cb cb_fn;
3563 	void *cb_ctx;
3564 };
3565 
3566 static void
3567 raid_bdev_examine_others_done(void *_ctx, int status)
3568 {
3569 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3570 
3571 	if (ctx->cb_fn != NULL) {
3572 		ctx->cb_fn(ctx->cb_ctx, status);
3573 	}
3574 	free(ctx);
3575 }
3576 
3577 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3578 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3579 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3580 				     void *cb_ctx);
3581 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3582 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3583 static void raid_bdev_examine_others(void *_ctx, int status);
3584 
3585 static void
3586 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3587 				 int status, void *_ctx)
3588 {
3589 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3590 
3591 	if (status != 0) {
3592 		raid_bdev_examine_others_done(ctx, status);
3593 		return;
3594 	}
3595 
3596 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3597 }
3598 
3599 static void
3600 raid_bdev_examine_others(void *_ctx, int status)
3601 {
3602 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3603 	struct raid_bdev *raid_bdev;
3604 	struct raid_base_bdev_info *base_info;
3605 	char uuid_str[SPDK_UUID_STRING_LEN];
3606 
3607 	if (status != 0 && status != -EEXIST) {
3608 		goto out;
3609 	}
3610 
3611 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3612 	if (raid_bdev == NULL) {
3613 		status = -ENODEV;
3614 		goto out;
3615 	}
3616 
3617 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3618 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3619 	     base_info++) {
3620 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3621 			continue;
3622 		}
3623 
3624 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3625 
3626 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3627 			continue;
3628 		}
3629 
3630 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3631 
3632 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3633 		if (status != 0) {
3634 			continue;
3635 		}
3636 		return;
3637 	}
3638 out:
3639 	raid_bdev_examine_others_done(ctx, status);
3640 }
3641 
3642 static void
3643 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3644 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3645 {
3646 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3647 	struct raid_bdev *raid_bdev;
3648 	struct raid_base_bdev_info *iter, *base_info;
3649 	uint8_t i;
3650 	int rc;
3651 
3652 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3653 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3654 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3655 		rc = -EINVAL;
3656 		goto out;
3657 	}
3658 
3659 	if (spdk_uuid_is_null(&sb->uuid)) {
3660 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3661 		rc = -EINVAL;
3662 		goto out;
3663 	}
3664 
3665 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3666 
3667 	if (raid_bdev) {
3668 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3669 			SPDK_DEBUGLOG(bdev_raid,
3670 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3671 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3672 
3673 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3674 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3675 					     raid_bdev->bdev.name, bdev->name);
3676 				rc = -EBUSY;
3677 				goto out;
3678 			}
3679 
3680 			/* remove and then recreate the raid bdev using the newer superblock */
3681 			raid_bdev_delete(raid_bdev, NULL, NULL);
3682 			raid_bdev = NULL;
3683 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3684 			SPDK_DEBUGLOG(bdev_raid,
3685 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3686 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3687 			/* use the current raid bdev superblock */
3688 			sb = raid_bdev->sb;
3689 		}
3690 	}
3691 
3692 	for (i = 0; i < sb->base_bdevs_size; i++) {
3693 		sb_base_bdev = &sb->base_bdevs[i];
3694 
3695 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3696 
3697 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3698 			break;
3699 		}
3700 	}
3701 
3702 	if (i == sb->base_bdevs_size) {
3703 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3704 		rc = -EINVAL;
3705 		goto out;
3706 	}
3707 
3708 	if (!raid_bdev) {
3709 		struct raid_bdev_examine_others_ctx *ctx;
3710 
3711 		ctx = calloc(1, sizeof(*ctx));
3712 		if (ctx == NULL) {
3713 			rc = -ENOMEM;
3714 			goto out;
3715 		}
3716 
3717 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3718 		if (rc != 0) {
3719 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3720 				    sb->name, spdk_strerror(-rc));
3721 			free(ctx);
3722 			goto out;
3723 		}
3724 
3725 		/* after this base bdev is configured, examine other base bdevs that may be present */
3726 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3727 		ctx->cb_fn = cb_fn;
3728 		ctx->cb_ctx = cb_ctx;
3729 
3730 		cb_fn = raid_bdev_examine_others;
3731 		cb_ctx = ctx;
3732 	}
3733 
3734 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3735 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3736 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3737 		assert(base_info->is_configured == false);
3738 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3739 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3740 		assert(spdk_uuid_is_null(&base_info->uuid));
3741 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3742 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3743 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3744 		if (rc != 0) {
3745 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3746 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3747 		}
3748 		goto out;
3749 	}
3750 
3751 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3752 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3753 			       bdev->name, raid_bdev->bdev.name);
3754 		rc = -EINVAL;
3755 		goto out;
3756 	}
3757 
3758 	base_info = NULL;
3759 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3760 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3761 			base_info = iter;
3762 			break;
3763 		}
3764 	}
3765 
3766 	if (base_info == NULL) {
3767 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3768 			    bdev->name, raid_bdev->bdev.name);
3769 		rc = -EINVAL;
3770 		goto out;
3771 	}
3772 
3773 	if (base_info->is_configured) {
3774 		rc = -EEXIST;
3775 		goto out;
3776 	}
3777 
3778 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3779 	if (rc != 0) {
3780 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3781 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3782 	}
3783 out:
3784 	if (rc != 0 && cb_fn != 0) {
3785 		cb_fn(cb_ctx, rc);
3786 	}
3787 }
3788 
3789 struct raid_bdev_examine_ctx {
3790 	struct spdk_bdev_desc *desc;
3791 	struct spdk_io_channel *ch;
3792 	raid_bdev_examine_load_sb_cb cb;
3793 	void *cb_ctx;
3794 };
3795 
3796 static void
3797 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3798 {
3799 	if (!ctx) {
3800 		return;
3801 	}
3802 
3803 	if (ctx->ch) {
3804 		spdk_put_io_channel(ctx->ch);
3805 	}
3806 
3807 	if (ctx->desc) {
3808 		spdk_bdev_close(ctx->desc);
3809 	}
3810 
3811 	free(ctx);
3812 }
3813 
3814 static void
3815 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3816 {
3817 	struct raid_bdev_examine_ctx *ctx = _ctx;
3818 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3819 
3820 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3821 
3822 	raid_bdev_examine_ctx_free(ctx);
3823 }
3824 
3825 static void
3826 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3827 {
3828 }
3829 
3830 static int
3831 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3832 {
3833 	struct raid_bdev_examine_ctx *ctx;
3834 	int rc;
3835 
3836 	assert(cb != NULL);
3837 
3838 	ctx = calloc(1, sizeof(*ctx));
3839 	if (!ctx) {
3840 		return -ENOMEM;
3841 	}
3842 
3843 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3844 	if (rc) {
3845 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3846 		goto err;
3847 	}
3848 
3849 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3850 	if (!ctx->ch) {
3851 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3852 		rc = -ENOMEM;
3853 		goto err;
3854 	}
3855 
3856 	ctx->cb = cb;
3857 	ctx->cb_ctx = cb_ctx;
3858 
3859 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3860 	if (rc) {
3861 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3862 			    bdev_name, spdk_strerror(-rc));
3863 		goto err;
3864 	}
3865 
3866 	return 0;
3867 err:
3868 	raid_bdev_examine_ctx_free(ctx);
3869 	return rc;
3870 }
3871 
3872 static void
3873 raid_bdev_examine_done(void *ctx, int status)
3874 {
3875 	struct spdk_bdev *bdev = ctx;
3876 
3877 	if (status != 0) {
3878 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3879 			    bdev->name, spdk_strerror(-status));
3880 	}
3881 	spdk_bdev_module_examine_done(&g_raid_if);
3882 }
3883 
3884 static void
3885 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3886 		       void *ctx)
3887 {
3888 	switch (status) {
3889 	case 0:
3890 		/* valid superblock found */
3891 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3892 		raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_done, bdev);
3893 		return;
3894 	case -EINVAL:
3895 		/* no valid superblock, check if it can be claimed anyway */
3896 		raid_bdev_examine_no_sb(bdev);
3897 		status = 0;
3898 		break;
3899 	}
3900 
3901 	raid_bdev_examine_done(bdev, status);
3902 }
3903 
3904 /*
3905  * brief:
3906  * raid_bdev_examine function is the examine function call by the below layers
3907  * like bdev_nvme layer. This function will check if this base bdev can be
3908  * claimed by this raid bdev or not.
3909  * params:
3910  * bdev - pointer to base bdev
3911  * returns:
3912  * none
3913  */
3914 static void
3915 raid_bdev_examine(struct spdk_bdev *bdev)
3916 {
3917 	int rc = 0;
3918 
3919 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3920 		goto done;
3921 	}
3922 
3923 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3924 		raid_bdev_examine_no_sb(bdev);
3925 		goto done;
3926 	}
3927 
3928 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3929 	if (rc != 0) {
3930 		goto done;
3931 	}
3932 
3933 	return;
3934 done:
3935 	raid_bdev_examine_done(bdev, rc);
3936 }
3937 
3938 /* Log component for bdev raid bdev module */
3939 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3940