xref: /spdk/module/bdev/raid/bdev_raid.c (revision b6875e1ce57743f3b1416016b9c624d79a862af9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT	1024
20 #define RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT	0
21 
22 static bool g_shutdown_started = false;
23 
24 /* List of all raid bdevs */
25 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
26 
27 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
28 
29 /*
30  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
31  * contains the relationship of raid bdev io channel with base bdev io channels.
32  */
33 struct raid_bdev_io_channel {
34 	/* Array of IO channels of base bdevs */
35 	struct spdk_io_channel	**base_channel;
36 
37 	/* Private raid module IO channel */
38 	struct spdk_io_channel	*module_channel;
39 
40 	/* Background process data */
41 	struct {
42 		uint64_t offset;
43 		struct spdk_io_channel *target_ch;
44 		struct raid_bdev_io_channel *ch_processed;
45 	} process;
46 };
47 
48 enum raid_bdev_process_state {
49 	RAID_PROCESS_STATE_INIT,
50 	RAID_PROCESS_STATE_RUNNING,
51 	RAID_PROCESS_STATE_STOPPING,
52 	RAID_PROCESS_STATE_STOPPED,
53 };
54 
55 struct raid_process_qos {
56 	bool enable_qos;
57 	uint64_t last_tsc;
58 	double bytes_per_tsc;
59 	double bytes_available;
60 	double bytes_max;
61 	struct spdk_poller *process_continue_poller;
62 };
63 
64 struct raid_bdev_process {
65 	struct raid_bdev		*raid_bdev;
66 	enum raid_process_type		type;
67 	enum raid_bdev_process_state	state;
68 	struct spdk_thread		*thread;
69 	struct raid_bdev_io_channel	*raid_ch;
70 	TAILQ_HEAD(, raid_bdev_process_request) requests;
71 	uint64_t			max_window_size;
72 	uint64_t			window_size;
73 	uint64_t			window_remaining;
74 	int				window_status;
75 	uint64_t			window_offset;
76 	bool				window_range_locked;
77 	struct raid_base_bdev_info	*target;
78 	int				status;
79 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
80 	struct raid_process_qos		qos;
81 };
82 
83 struct raid_process_finish_action {
84 	spdk_msg_fn cb;
85 	void *cb_ctx;
86 	TAILQ_ENTRY(raid_process_finish_action) link;
87 };
88 
89 static struct spdk_raid_bdev_opts g_opts = {
90 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
91 	.process_max_bandwidth_mb_sec = RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT,
92 };
93 
94 void
95 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
96 {
97 	*opts = g_opts;
98 }
99 
100 int
101 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
102 {
103 	if (opts->process_window_size_kb == 0) {
104 		return -EINVAL;
105 	}
106 
107 	g_opts = *opts;
108 
109 	return 0;
110 }
111 
112 static struct raid_bdev_module *
113 raid_bdev_module_find(enum raid_level level)
114 {
115 	struct raid_bdev_module *raid_module;
116 
117 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
118 		if (raid_module->level == level) {
119 			return raid_module;
120 		}
121 	}
122 
123 	return NULL;
124 }
125 
126 void
127 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
128 {
129 	if (raid_bdev_module_find(raid_module->level) != NULL) {
130 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
131 			    raid_bdev_level_to_str(raid_module->level));
132 		assert(false);
133 	} else {
134 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
135 	}
136 }
137 
138 struct spdk_io_channel *
139 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
140 {
141 	return raid_ch->base_channel[idx];
142 }
143 
144 void *
145 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
146 {
147 	assert(raid_ch->module_channel != NULL);
148 
149 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
150 }
151 
152 struct raid_base_bdev_info *
153 raid_bdev_channel_get_base_info(struct raid_bdev_io_channel *raid_ch, struct spdk_bdev *base_bdev)
154 {
155 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
156 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
157 	uint8_t i;
158 
159 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
160 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[i];
161 
162 		if (base_info->is_configured &&
163 		    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
164 			return base_info;
165 		}
166 	}
167 
168 	return NULL;
169 }
170 
171 /* Function declarations */
172 static void	raid_bdev_examine(struct spdk_bdev *bdev);
173 static int	raid_bdev_init(void);
174 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
175 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
176 
177 static void
178 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
179 {
180 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
181 
182 	if (raid_ch->process.target_ch != NULL) {
183 		spdk_put_io_channel(raid_ch->process.target_ch);
184 		raid_ch->process.target_ch = NULL;
185 	}
186 
187 	if (raid_ch->process.ch_processed != NULL) {
188 		free(raid_ch->process.ch_processed->base_channel);
189 		free(raid_ch->process.ch_processed);
190 		raid_ch->process.ch_processed = NULL;
191 	}
192 }
193 
194 static int
195 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
196 {
197 	struct raid_bdev *raid_bdev = process->raid_bdev;
198 	struct raid_bdev_io_channel *raid_ch_processed;
199 	struct raid_base_bdev_info *base_info;
200 
201 	raid_ch->process.offset = process->window_offset;
202 
203 	/* In the future we may have other types of processes which don't use a target bdev,
204 	 * like data scrubbing or strip size migration. Until then, expect that there always is
205 	 * a process target. */
206 	assert(process->target != NULL);
207 
208 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
209 	if (raid_ch->process.target_ch == NULL) {
210 		goto err;
211 	}
212 
213 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
214 	if (raid_ch_processed == NULL) {
215 		goto err;
216 	}
217 	raid_ch->process.ch_processed = raid_ch_processed;
218 
219 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
220 					  sizeof(*raid_ch_processed->base_channel));
221 	if (raid_ch_processed->base_channel == NULL) {
222 		goto err;
223 	}
224 
225 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
226 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
227 
228 		if (base_info != process->target) {
229 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
230 		} else {
231 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
232 		}
233 	}
234 
235 	raid_ch_processed->module_channel = raid_ch->module_channel;
236 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
237 
238 	return 0;
239 err:
240 	raid_bdev_ch_process_cleanup(raid_ch);
241 	return -ENOMEM;
242 }
243 
244 /*
245  * brief:
246  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
247  * hierarchy from raid bdev to base bdev io channels. It will be called per core
248  * params:
249  * io_device - pointer to raid bdev io device represented by raid_bdev
250  * ctx_buf - pointer to context buffer for raid bdev io channel
251  * returns:
252  * 0 - success
253  * non zero - failure
254  */
255 static int
256 raid_bdev_create_cb(void *io_device, void *ctx_buf)
257 {
258 	struct raid_bdev            *raid_bdev = io_device;
259 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
260 	uint8_t i;
261 	int ret = -ENOMEM;
262 
263 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
264 
265 	assert(raid_bdev != NULL);
266 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
267 
268 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
269 	if (!raid_ch->base_channel) {
270 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
271 		return -ENOMEM;
272 	}
273 
274 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
275 		/*
276 		 * Get the spdk_io_channel for all the base bdevs. This is used during
277 		 * split logic to send the respective child bdev ios to respective base
278 		 * bdev io channel.
279 		 * Skip missing base bdevs and the process target, which should also be treated as
280 		 * missing until the process completes.
281 		 */
282 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
283 		    raid_bdev->base_bdev_info[i].is_process_target == true) {
284 			continue;
285 		}
286 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
287 						   raid_bdev->base_bdev_info[i].desc);
288 		if (!raid_ch->base_channel[i]) {
289 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
290 			goto err;
291 		}
292 	}
293 
294 	if (raid_bdev->module->get_io_channel) {
295 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
296 		if (!raid_ch->module_channel) {
297 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
298 			goto err;
299 		}
300 	}
301 
302 	if (raid_bdev->process != NULL) {
303 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
304 		if (ret != 0) {
305 			SPDK_ERRLOG("Failed to setup process io channel\n");
306 			goto err;
307 		}
308 	} else {
309 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
310 	}
311 
312 	return 0;
313 err:
314 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
315 		if (raid_ch->base_channel[i] != NULL) {
316 			spdk_put_io_channel(raid_ch->base_channel[i]);
317 		}
318 	}
319 	free(raid_ch->base_channel);
320 
321 	raid_bdev_ch_process_cleanup(raid_ch);
322 
323 	return ret;
324 }
325 
326 /*
327  * brief:
328  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
329  * hierarchy from raid bdev to base bdev io channels. It will be called per core
330  * params:
331  * io_device - pointer to raid bdev io device represented by raid_bdev
332  * ctx_buf - pointer to context buffer for raid bdev io channel
333  * returns:
334  * none
335  */
336 static void
337 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
338 {
339 	struct raid_bdev *raid_bdev = io_device;
340 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
341 	uint8_t i;
342 
343 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
344 
345 	assert(raid_ch != NULL);
346 	assert(raid_ch->base_channel);
347 
348 	if (raid_ch->module_channel) {
349 		spdk_put_io_channel(raid_ch->module_channel);
350 	}
351 
352 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
353 		/* Free base bdev channels */
354 		if (raid_ch->base_channel[i] != NULL) {
355 			spdk_put_io_channel(raid_ch->base_channel[i]);
356 		}
357 	}
358 	free(raid_ch->base_channel);
359 	raid_ch->base_channel = NULL;
360 
361 	raid_bdev_ch_process_cleanup(raid_ch);
362 }
363 
364 /*
365  * brief:
366  * raid_bdev_cleanup is used to cleanup raid_bdev related data
367  * structures.
368  * params:
369  * raid_bdev - pointer to raid_bdev
370  * returns:
371  * none
372  */
373 static void
374 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
375 {
376 	struct raid_base_bdev_info *base_info;
377 
378 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
379 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
380 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
381 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
382 
383 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
384 		assert(base_info->desc == NULL);
385 		free(base_info->name);
386 	}
387 
388 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
389 }
390 
391 static void
392 raid_bdev_free(struct raid_bdev *raid_bdev)
393 {
394 	raid_bdev_free_superblock(raid_bdev);
395 	free(raid_bdev->base_bdev_info);
396 	free(raid_bdev->bdev.name);
397 	free(raid_bdev);
398 }
399 
400 static void
401 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
402 {
403 	raid_bdev_cleanup(raid_bdev);
404 	raid_bdev_free(raid_bdev);
405 }
406 
407 static void
408 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
409 {
410 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
411 
412 	assert(base_info->is_configured);
413 	assert(raid_bdev->num_base_bdevs_discovered);
414 	raid_bdev->num_base_bdevs_discovered--;
415 	base_info->is_configured = false;
416 	base_info->is_process_target = false;
417 }
418 
419 /*
420  * brief:
421  * free resource of base bdev for raid bdev
422  * params:
423  * base_info - raid base bdev info
424  * returns:
425  * none
426  */
427 static void
428 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
429 {
430 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
431 
432 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
433 
434 	free(base_info->name);
435 	base_info->name = NULL;
436 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
437 		spdk_uuid_set_null(&base_info->uuid);
438 	}
439 	base_info->is_failed = false;
440 
441 	if (base_info->desc == NULL) {
442 		return;
443 	}
444 
445 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
446 	spdk_bdev_close(base_info->desc);
447 	base_info->desc = NULL;
448 	spdk_put_io_channel(base_info->app_thread_ch);
449 	base_info->app_thread_ch = NULL;
450 
451 	if (base_info->is_configured) {
452 		raid_bdev_deconfigure_base_bdev(base_info);
453 	}
454 }
455 
456 static void
457 raid_bdev_io_device_unregister_cb(void *io_device)
458 {
459 	struct raid_bdev *raid_bdev = io_device;
460 
461 	if (raid_bdev->num_base_bdevs_discovered == 0) {
462 		/* Free raid_bdev when there are no base bdevs left */
463 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
464 		raid_bdev_cleanup(raid_bdev);
465 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
466 		raid_bdev_free(raid_bdev);
467 	} else {
468 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
469 	}
470 }
471 
472 void
473 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
474 {
475 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
476 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
477 	}
478 }
479 
480 static void
481 _raid_bdev_destruct(void *ctxt)
482 {
483 	struct raid_bdev *raid_bdev = ctxt;
484 	struct raid_base_bdev_info *base_info;
485 
486 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
487 
488 	assert(raid_bdev->process == NULL);
489 
490 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
491 		/*
492 		 * Close all base bdev descriptors for which call has come from below
493 		 * layers.  Also close the descriptors if we have started shutdown.
494 		 */
495 		if (g_shutdown_started || base_info->remove_scheduled == true) {
496 			raid_bdev_free_base_bdev_resource(base_info);
497 		}
498 	}
499 
500 	if (g_shutdown_started) {
501 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
502 	}
503 
504 	if (raid_bdev->module->stop != NULL) {
505 		if (raid_bdev->module->stop(raid_bdev) == false) {
506 			return;
507 		}
508 	}
509 
510 	raid_bdev_module_stop_done(raid_bdev);
511 }
512 
513 static int
514 raid_bdev_destruct(void *ctx)
515 {
516 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
517 
518 	return 1;
519 }
520 
521 int
522 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
523 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
524 {
525 	struct spdk_dif_ctx dif_ctx;
526 	struct spdk_dif_error err_blk = {};
527 	int rc;
528 	struct spdk_dif_ctx_init_ext_opts dif_opts;
529 	struct iovec md_iov = {
530 		.iov_base	= md_buf,
531 		.iov_len	= num_blocks * bdev->md_len,
532 	};
533 
534 	if (md_buf == NULL) {
535 		return 0;
536 	}
537 
538 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
539 	dif_opts.dif_pi_format = bdev->dif_pi_format;
540 	rc = spdk_dif_ctx_init(&dif_ctx,
541 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
542 			       bdev->dif_is_head_of_md, bdev->dif_type,
543 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
544 			       0, 0, 0, 0, 0, &dif_opts);
545 	if (rc != 0) {
546 		SPDK_ERRLOG("Initialization of DIF context failed\n");
547 		return rc;
548 	}
549 
550 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
551 
552 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
553 	if (rc != 0) {
554 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
555 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
556 	}
557 
558 	return rc;
559 }
560 
561 int
562 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
563 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
564 {
565 	struct spdk_dif_ctx dif_ctx;
566 	struct spdk_dif_error err_blk = {};
567 	int rc;
568 	struct spdk_dif_ctx_init_ext_opts dif_opts;
569 	struct iovec md_iov = {
570 		.iov_base	= md_buf,
571 		.iov_len	= num_blocks * bdev->md_len,
572 	};
573 
574 	if (md_buf == NULL) {
575 		return 0;
576 	}
577 
578 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
579 	dif_opts.dif_pi_format = bdev->dif_pi_format;
580 	rc = spdk_dif_ctx_init(&dif_ctx,
581 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
582 			       bdev->dif_is_head_of_md, bdev->dif_type,
583 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
584 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
585 	if (rc != 0) {
586 		SPDK_ERRLOG("Initialization of DIF context failed\n");
587 		return rc;
588 	}
589 
590 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
591 	if (rc != 0) {
592 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
593 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
594 	}
595 
596 	return rc;
597 }
598 
599 void
600 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
601 {
602 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
603 	int rc;
604 
605 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
606 		struct iovec *split_iov = raid_io->split.iov;
607 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
608 
609 		/*
610 		 * Non-zero offset here means that this is the completion of the first part of the
611 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
612 		 */
613 		if (raid_io->split.offset != 0) {
614 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
615 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
616 
617 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
618 				raid_io->num_blocks = raid_io->split.offset;
619 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
620 				raid_io->iovs = bdev_io->u.bdev.iovs;
621 				if (split_iov != NULL) {
622 					raid_io->iovcnt++;
623 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
624 					split_iov->iov_base = split_iov_orig->iov_base;
625 				}
626 
627 				raid_io->split.offset = 0;
628 				raid_io->base_bdev_io_submitted = 0;
629 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
630 
631 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
632 				return;
633 			}
634 		}
635 
636 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
637 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
638 		raid_io->iovs = bdev_io->u.bdev.iovs;
639 		if (split_iov != NULL) {
640 			*split_iov = *split_iov_orig;
641 		}
642 	}
643 
644 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
645 		raid_io->completion_cb(raid_io, status);
646 	} else {
647 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
648 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
649 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
650 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
651 
652 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
653 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
654 							bdev_io->u.bdev.offset_blocks);
655 			if (rc != 0) {
656 				status = SPDK_BDEV_IO_STATUS_FAILED;
657 			}
658 		}
659 		spdk_bdev_io_complete(bdev_io, status);
660 	}
661 }
662 
663 /*
664  * brief:
665  * raid_bdev_io_complete_part - signal the completion of a part of the expected
666  * base bdev IOs and complete the raid_io if this is the final expected IO.
667  * The caller should first set raid_io->base_bdev_io_remaining. This function
668  * will decrement this counter by the value of the 'completed' parameter and
669  * complete the raid_io if the counter reaches 0. The caller is free to
670  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
671  * it can represent e.g. blocks or IOs.
672  * params:
673  * raid_io - pointer to raid_bdev_io
674  * completed - the part of the raid_io that has been completed
675  * status - status of the base IO
676  * returns:
677  * true - if the raid_io is completed
678  * false - otherwise
679  */
680 bool
681 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
682 			   enum spdk_bdev_io_status status)
683 {
684 	assert(raid_io->base_bdev_io_remaining >= completed);
685 	raid_io->base_bdev_io_remaining -= completed;
686 
687 	if (status != raid_io->base_bdev_io_status_default) {
688 		raid_io->base_bdev_io_status = status;
689 	}
690 
691 	if (raid_io->base_bdev_io_remaining == 0) {
692 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
693 		return true;
694 	} else {
695 		return false;
696 	}
697 }
698 
699 /*
700  * brief:
701  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
702  * It will try to queue the IOs after storing the context to bdev wait queue logic.
703  * params:
704  * raid_io - pointer to raid_bdev_io
705  * bdev - the block device that the IO is submitted to
706  * ch - io channel
707  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
708  * returns:
709  * none
710  */
711 void
712 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
713 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
714 {
715 	raid_io->waitq_entry.bdev = bdev;
716 	raid_io->waitq_entry.cb_fn = cb_fn;
717 	raid_io->waitq_entry.cb_arg = raid_io;
718 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
719 }
720 
721 static void
722 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
723 {
724 	struct raid_bdev_io *raid_io = cb_arg;
725 
726 	spdk_bdev_free_io(bdev_io);
727 
728 	raid_bdev_io_complete_part(raid_io, 1, success ?
729 				   SPDK_BDEV_IO_STATUS_SUCCESS :
730 				   SPDK_BDEV_IO_STATUS_FAILED);
731 }
732 
733 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
734 
735 static void
736 _raid_bdev_submit_reset_request(void *_raid_io)
737 {
738 	struct raid_bdev_io *raid_io = _raid_io;
739 
740 	raid_bdev_submit_reset_request(raid_io);
741 }
742 
743 /*
744  * brief:
745  * raid_bdev_submit_reset_request function submits reset requests
746  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
747  * which case it will queue it for later submission
748  * params:
749  * raid_io
750  * returns:
751  * none
752  */
753 static void
754 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
755 {
756 	struct raid_bdev		*raid_bdev;
757 	int				ret;
758 	uint8_t				i;
759 	struct raid_base_bdev_info	*base_info;
760 	struct spdk_io_channel		*base_ch;
761 
762 	raid_bdev = raid_io->raid_bdev;
763 
764 	if (raid_io->base_bdev_io_remaining == 0) {
765 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
766 	}
767 
768 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
769 		base_info = &raid_bdev->base_bdev_info[i];
770 		base_ch = raid_io->raid_ch->base_channel[i];
771 		if (base_ch == NULL) {
772 			raid_io->base_bdev_io_submitted++;
773 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
774 			continue;
775 		}
776 		ret = spdk_bdev_reset(base_info->desc, base_ch,
777 				      raid_base_bdev_reset_complete, raid_io);
778 		if (ret == 0) {
779 			raid_io->base_bdev_io_submitted++;
780 		} else if (ret == -ENOMEM) {
781 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
782 						base_ch, _raid_bdev_submit_reset_request);
783 			return;
784 		} else {
785 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
786 			assert(false);
787 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
788 			return;
789 		}
790 	}
791 }
792 
793 static void
794 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
795 {
796 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
797 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
798 	int i;
799 
800 	assert(split_offset != 0);
801 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
802 	raid_io->split.offset = split_offset;
803 
804 	raid_io->offset_blocks += split_offset;
805 	raid_io->num_blocks -= split_offset;
806 	if (raid_io->md_buf != NULL) {
807 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
808 	}
809 
810 	for (i = 0; i < raid_io->iovcnt; i++) {
811 		struct iovec *iov = &raid_io->iovs[i];
812 
813 		if (iov_offset < iov->iov_len) {
814 			if (iov_offset == 0) {
815 				raid_io->split.iov = NULL;
816 			} else {
817 				raid_io->split.iov = iov;
818 				raid_io->split.iov_copy = *iov;
819 				iov->iov_base += iov_offset;
820 				iov->iov_len -= iov_offset;
821 			}
822 			raid_io->iovs += i;
823 			raid_io->iovcnt -= i;
824 			break;
825 		}
826 
827 		iov_offset -= iov->iov_len;
828 	}
829 }
830 
831 static void
832 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
833 {
834 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
835 
836 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
837 		uint64_t offset_begin = raid_io->offset_blocks;
838 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
839 
840 		if (offset_end > raid_ch->process.offset) {
841 			if (offset_begin < raid_ch->process.offset) {
842 				/*
843 				 * If the I/O spans both the processed and unprocessed ranges,
844 				 * split it and first handle the unprocessed part. After it
845 				 * completes, the rest will be handled.
846 				 * This situation occurs when the process thread is not active
847 				 * or is waiting for the process window range to be locked
848 				 * (quiesced). When a window is being processed, such I/Os will be
849 				 * deferred by the bdev layer until the window is unlocked.
850 				 */
851 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
852 					      raid_ch->process.offset, offset_begin, offset_end);
853 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
854 			}
855 		} else {
856 			/* Use the child channel, which corresponds to the already processed range */
857 			raid_io->raid_ch = raid_ch->process.ch_processed;
858 		}
859 	}
860 
861 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
862 }
863 
864 /*
865  * brief:
866  * Callback function to spdk_bdev_io_get_buf.
867  * params:
868  * ch - pointer to raid bdev io channel
869  * bdev_io - pointer to parent bdev_io on raid bdev device
870  * success - True if buffer is allocated or false otherwise.
871  * returns:
872  * none
873  */
874 static void
875 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
876 		     bool success)
877 {
878 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
879 
880 	if (!success) {
881 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
882 		return;
883 	}
884 
885 	raid_bdev_submit_rw_request(raid_io);
886 }
887 
888 void
889 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
890 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
891 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
892 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
893 {
894 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
895 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
896 
897 	raid_io->type = type;
898 	raid_io->offset_blocks = offset_blocks;
899 	raid_io->num_blocks = num_blocks;
900 	raid_io->iovs = iovs;
901 	raid_io->iovcnt = iovcnt;
902 	raid_io->memory_domain = memory_domain;
903 	raid_io->memory_domain_ctx = memory_domain_ctx;
904 	raid_io->md_buf = md_buf;
905 
906 	raid_io->raid_bdev = raid_bdev;
907 	raid_io->raid_ch = raid_ch;
908 	raid_io->base_bdev_io_remaining = 0;
909 	raid_io->base_bdev_io_submitted = 0;
910 	raid_io->completion_cb = NULL;
911 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
912 
913 	raid_bdev_io_set_default_status(raid_io, SPDK_BDEV_IO_STATUS_SUCCESS);
914 }
915 
916 /*
917  * brief:
918  * raid_bdev_submit_request function is the submit_request function pointer of
919  * raid bdev function table. This is used to submit the io on raid_bdev to below
920  * layers.
921  * params:
922  * ch - pointer to raid bdev io channel
923  * bdev_io - pointer to parent bdev_io on raid bdev device
924  * returns:
925  * none
926  */
927 static void
928 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
929 {
930 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
931 
932 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
933 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
934 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
935 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
936 
937 	switch (bdev_io->type) {
938 	case SPDK_BDEV_IO_TYPE_READ:
939 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
940 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
941 		break;
942 	case SPDK_BDEV_IO_TYPE_WRITE:
943 		raid_bdev_submit_rw_request(raid_io);
944 		break;
945 
946 	case SPDK_BDEV_IO_TYPE_RESET:
947 		raid_bdev_submit_reset_request(raid_io);
948 		break;
949 
950 	case SPDK_BDEV_IO_TYPE_FLUSH:
951 	case SPDK_BDEV_IO_TYPE_UNMAP:
952 		if (raid_io->raid_bdev->process != NULL) {
953 			/* TODO: rebuild support */
954 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
955 			return;
956 		}
957 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
958 		break;
959 
960 	default:
961 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
962 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
963 		break;
964 	}
965 }
966 
967 /*
968  * brief:
969  * _raid_bdev_io_type_supported checks whether io_type is supported in
970  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
971  * doesn't support, the raid device doesn't supports.
972  *
973  * params:
974  * raid_bdev - pointer to raid bdev context
975  * io_type - io type
976  * returns:
977  * true - io_type is supported
978  * false - io_type is not supported
979  */
980 inline static bool
981 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
982 {
983 	struct raid_base_bdev_info *base_info;
984 
985 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
986 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
987 		if (raid_bdev->module->submit_null_payload_request == NULL) {
988 			return false;
989 		}
990 	}
991 
992 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
993 		if (base_info->desc == NULL) {
994 			continue;
995 		}
996 
997 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
998 			return false;
999 		}
1000 	}
1001 
1002 	return true;
1003 }
1004 
1005 /*
1006  * brief:
1007  * raid_bdev_io_type_supported is the io_supported function for bdev function
1008  * table which returns whether the particular io type is supported or not by
1009  * raid bdev module
1010  * params:
1011  * ctx - pointer to raid bdev context
1012  * type - io type
1013  * returns:
1014  * true - io_type is supported
1015  * false - io_type is not supported
1016  */
1017 static bool
1018 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1019 {
1020 	switch (io_type) {
1021 	case SPDK_BDEV_IO_TYPE_READ:
1022 	case SPDK_BDEV_IO_TYPE_WRITE:
1023 		return true;
1024 
1025 	case SPDK_BDEV_IO_TYPE_FLUSH:
1026 	case SPDK_BDEV_IO_TYPE_RESET:
1027 	case SPDK_BDEV_IO_TYPE_UNMAP:
1028 		return _raid_bdev_io_type_supported(ctx, io_type);
1029 
1030 	default:
1031 		return false;
1032 	}
1033 
1034 	return false;
1035 }
1036 
1037 /*
1038  * brief:
1039  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1040  * raid bdev. This is used to return the io channel for this raid bdev
1041  * params:
1042  * ctxt - pointer to raid_bdev
1043  * returns:
1044  * pointer to io channel for raid bdev
1045  */
1046 static struct spdk_io_channel *
1047 raid_bdev_get_io_channel(void *ctxt)
1048 {
1049 	struct raid_bdev *raid_bdev = ctxt;
1050 
1051 	return spdk_get_io_channel(raid_bdev);
1052 }
1053 
1054 void
1055 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1056 {
1057 	struct raid_base_bdev_info *base_info;
1058 
1059 	assert(raid_bdev != NULL);
1060 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1061 
1062 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1063 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1064 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1065 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1066 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1067 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1068 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1069 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1070 				     raid_bdev->num_base_bdevs_operational);
1071 	if (raid_bdev->process) {
1072 		struct raid_bdev_process *process = raid_bdev->process;
1073 		uint64_t offset = process->window_offset;
1074 
1075 		spdk_json_write_named_object_begin(w, "process");
1076 		spdk_json_write_name(w, "type");
1077 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1078 		spdk_json_write_named_string(w, "target", process->target->name);
1079 		spdk_json_write_named_object_begin(w, "progress");
1080 		spdk_json_write_named_uint64(w, "blocks", offset);
1081 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1082 		spdk_json_write_object_end(w);
1083 		spdk_json_write_object_end(w);
1084 	}
1085 	spdk_json_write_name(w, "base_bdevs_list");
1086 	spdk_json_write_array_begin(w);
1087 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1088 		spdk_json_write_object_begin(w);
1089 		spdk_json_write_name(w, "name");
1090 		if (base_info->name) {
1091 			spdk_json_write_string(w, base_info->name);
1092 		} else {
1093 			spdk_json_write_null(w);
1094 		}
1095 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1096 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1097 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1098 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1099 		spdk_json_write_object_end(w);
1100 	}
1101 	spdk_json_write_array_end(w);
1102 }
1103 
1104 /*
1105  * brief:
1106  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1107  * params:
1108  * ctx - pointer to raid_bdev
1109  * w - pointer to json context
1110  * returns:
1111  * 0 - success
1112  * non zero - failure
1113  */
1114 static int
1115 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1116 {
1117 	struct raid_bdev *raid_bdev = ctx;
1118 
1119 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1120 
1121 	/* Dump the raid bdev configuration related information */
1122 	spdk_json_write_named_object_begin(w, "raid");
1123 	raid_bdev_write_info_json(raid_bdev, w);
1124 	spdk_json_write_object_end(w);
1125 
1126 	return 0;
1127 }
1128 
1129 /*
1130  * brief:
1131  * raid_bdev_write_config_json is the function table pointer for raid bdev
1132  * params:
1133  * bdev - pointer to spdk_bdev
1134  * w - pointer to json context
1135  * returns:
1136  * none
1137  */
1138 static void
1139 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1140 {
1141 	struct raid_bdev *raid_bdev = bdev->ctxt;
1142 	struct raid_base_bdev_info *base_info;
1143 
1144 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1145 
1146 	if (raid_bdev->superblock_enabled) {
1147 		/* raid bdev configuration is stored in the superblock */
1148 		return;
1149 	}
1150 
1151 	spdk_json_write_object_begin(w);
1152 
1153 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1154 
1155 	spdk_json_write_named_object_begin(w, "params");
1156 	spdk_json_write_named_string(w, "name", bdev->name);
1157 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1158 	if (raid_bdev->strip_size_kb != 0) {
1159 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1160 	}
1161 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1162 
1163 	spdk_json_write_named_array_begin(w, "base_bdevs");
1164 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1165 		if (base_info->name) {
1166 			spdk_json_write_string(w, base_info->name);
1167 		} else {
1168 			char str[32];
1169 
1170 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1171 			spdk_json_write_string(w, str);
1172 		}
1173 	}
1174 	spdk_json_write_array_end(w);
1175 	spdk_json_write_object_end(w);
1176 
1177 	spdk_json_write_object_end(w);
1178 }
1179 
1180 static int
1181 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1182 {
1183 	struct raid_bdev *raid_bdev = ctx;
1184 	struct raid_base_bdev_info *base_info;
1185 	int domains_count = 0, rc = 0;
1186 
1187 	if (raid_bdev->module->memory_domains_supported == false) {
1188 		return 0;
1189 	}
1190 
1191 	/* First loop to get the number of memory domains */
1192 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1193 		if (base_info->is_configured == false) {
1194 			continue;
1195 		}
1196 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1197 		if (rc < 0) {
1198 			return rc;
1199 		}
1200 		domains_count += rc;
1201 	}
1202 
1203 	if (!domains || array_size < domains_count) {
1204 		return domains_count;
1205 	}
1206 
1207 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1208 		if (base_info->is_configured == false) {
1209 			continue;
1210 		}
1211 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1212 		if (rc < 0) {
1213 			return rc;
1214 		}
1215 		domains += rc;
1216 		array_size -= rc;
1217 	}
1218 
1219 	return domains_count;
1220 }
1221 
1222 /* g_raid_bdev_fn_table is the function table for raid bdev */
1223 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1224 	.destruct		= raid_bdev_destruct,
1225 	.submit_request		= raid_bdev_submit_request,
1226 	.io_type_supported	= raid_bdev_io_type_supported,
1227 	.get_io_channel		= raid_bdev_get_io_channel,
1228 	.dump_info_json		= raid_bdev_dump_info_json,
1229 	.write_config_json	= raid_bdev_write_config_json,
1230 	.get_memory_domains	= raid_bdev_get_memory_domains,
1231 };
1232 
1233 struct raid_bdev *
1234 raid_bdev_find_by_name(const char *name)
1235 {
1236 	struct raid_bdev *raid_bdev;
1237 
1238 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1239 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1240 			return raid_bdev;
1241 		}
1242 	}
1243 
1244 	return NULL;
1245 }
1246 
1247 static struct raid_bdev *
1248 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1249 {
1250 	struct raid_bdev *raid_bdev;
1251 
1252 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1253 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1254 			return raid_bdev;
1255 		}
1256 	}
1257 
1258 	return NULL;
1259 }
1260 
1261 static struct {
1262 	const char *name;
1263 	enum raid_level value;
1264 } g_raid_level_names[] = {
1265 	{ "raid0", RAID0 },
1266 	{ "0", RAID0 },
1267 	{ "raid1", RAID1 },
1268 	{ "1", RAID1 },
1269 	{ "raid5f", RAID5F },
1270 	{ "5f", RAID5F },
1271 	{ "concat", CONCAT },
1272 	{ }
1273 };
1274 
1275 const char *g_raid_state_names[] = {
1276 	[RAID_BDEV_STATE_ONLINE]	= "online",
1277 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1278 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1279 	[RAID_BDEV_STATE_MAX]		= NULL
1280 };
1281 
1282 static const char *g_raid_process_type_names[] = {
1283 	[RAID_PROCESS_NONE]	= "none",
1284 	[RAID_PROCESS_REBUILD]	= "rebuild",
1285 	[RAID_PROCESS_MAX]	= NULL
1286 };
1287 
1288 /* We have to use the typedef in the function declaration to appease astyle. */
1289 typedef enum raid_level raid_level_t;
1290 typedef enum raid_bdev_state raid_bdev_state_t;
1291 
1292 raid_level_t
1293 raid_bdev_str_to_level(const char *str)
1294 {
1295 	unsigned int i;
1296 
1297 	assert(str != NULL);
1298 
1299 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1300 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1301 			return g_raid_level_names[i].value;
1302 		}
1303 	}
1304 
1305 	return INVALID_RAID_LEVEL;
1306 }
1307 
1308 const char *
1309 raid_bdev_level_to_str(enum raid_level level)
1310 {
1311 	unsigned int i;
1312 
1313 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1314 		if (g_raid_level_names[i].value == level) {
1315 			return g_raid_level_names[i].name;
1316 		}
1317 	}
1318 
1319 	return "";
1320 }
1321 
1322 raid_bdev_state_t
1323 raid_bdev_str_to_state(const char *str)
1324 {
1325 	unsigned int i;
1326 
1327 	assert(str != NULL);
1328 
1329 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1330 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1331 			break;
1332 		}
1333 	}
1334 
1335 	return i;
1336 }
1337 
1338 const char *
1339 raid_bdev_state_to_str(enum raid_bdev_state state)
1340 {
1341 	if (state >= RAID_BDEV_STATE_MAX) {
1342 		return "";
1343 	}
1344 
1345 	return g_raid_state_names[state];
1346 }
1347 
1348 const char *
1349 raid_bdev_process_to_str(enum raid_process_type value)
1350 {
1351 	if (value >= RAID_PROCESS_MAX) {
1352 		return "";
1353 	}
1354 
1355 	return g_raid_process_type_names[value];
1356 }
1357 
1358 /*
1359  * brief:
1360  * raid_bdev_fini_start is called when bdev layer is starting the
1361  * shutdown process
1362  * params:
1363  * none
1364  * returns:
1365  * none
1366  */
1367 static void
1368 raid_bdev_fini_start(void)
1369 {
1370 	struct raid_bdev *raid_bdev;
1371 	struct raid_base_bdev_info *base_info;
1372 
1373 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1374 
1375 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1376 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1377 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1378 				raid_bdev_free_base_bdev_resource(base_info);
1379 			}
1380 		}
1381 	}
1382 
1383 	g_shutdown_started = true;
1384 }
1385 
1386 /*
1387  * brief:
1388  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1389  * params:
1390  * none
1391  * returns:
1392  * none
1393  */
1394 static void
1395 raid_bdev_exit(void)
1396 {
1397 	struct raid_bdev *raid_bdev, *tmp;
1398 
1399 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1400 
1401 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1402 		raid_bdev_cleanup_and_free(raid_bdev);
1403 	}
1404 }
1405 
1406 static void
1407 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1408 {
1409 	spdk_json_write_object_begin(w);
1410 
1411 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1412 
1413 	spdk_json_write_named_object_begin(w, "params");
1414 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1415 	spdk_json_write_named_uint32(w, "process_max_bandwidth_mb_sec",
1416 				     g_opts.process_max_bandwidth_mb_sec);
1417 	spdk_json_write_object_end(w);
1418 
1419 	spdk_json_write_object_end(w);
1420 }
1421 
1422 static int
1423 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1424 {
1425 	raid_bdev_opts_config_json(w);
1426 
1427 	return 0;
1428 }
1429 
1430 /*
1431  * brief:
1432  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1433  * module
1434  * params:
1435  * none
1436  * returns:
1437  * size of spdk_bdev_io context for raid
1438  */
1439 static int
1440 raid_bdev_get_ctx_size(void)
1441 {
1442 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1443 	return sizeof(struct raid_bdev_io);
1444 }
1445 
1446 static struct spdk_bdev_module g_raid_if = {
1447 	.name = "raid",
1448 	.module_init = raid_bdev_init,
1449 	.fini_start = raid_bdev_fini_start,
1450 	.module_fini = raid_bdev_exit,
1451 	.config_json = raid_bdev_config_json,
1452 	.get_ctx_size = raid_bdev_get_ctx_size,
1453 	.examine_disk = raid_bdev_examine,
1454 	.async_init = false,
1455 	.async_fini = false,
1456 };
1457 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1458 
1459 /*
1460  * brief:
1461  * raid_bdev_init is the initialization function for raid bdev module
1462  * params:
1463  * none
1464  * returns:
1465  * 0 - success
1466  * non zero - failure
1467  */
1468 static int
1469 raid_bdev_init(void)
1470 {
1471 	return 0;
1472 }
1473 
1474 static int
1475 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1476 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1477 		  struct raid_bdev **raid_bdev_out)
1478 {
1479 	struct raid_bdev *raid_bdev;
1480 	struct spdk_bdev *raid_bdev_gen;
1481 	struct raid_bdev_module *module;
1482 	struct raid_base_bdev_info *base_info;
1483 	uint8_t min_operational;
1484 
1485 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1486 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1487 		return -EINVAL;
1488 	}
1489 
1490 	if (raid_bdev_find_by_name(name) != NULL) {
1491 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1492 		return -EEXIST;
1493 	}
1494 
1495 	if (level == RAID1) {
1496 		if (strip_size != 0) {
1497 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1498 			return -EINVAL;
1499 		}
1500 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1501 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1502 		return -EINVAL;
1503 	}
1504 
1505 	module = raid_bdev_module_find(level);
1506 	if (module == NULL) {
1507 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1508 		return -EINVAL;
1509 	}
1510 
1511 	assert(module->base_bdevs_min != 0);
1512 	if (num_base_bdevs < module->base_bdevs_min) {
1513 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1514 			    module->base_bdevs_min,
1515 			    raid_bdev_level_to_str(level));
1516 		return -EINVAL;
1517 	}
1518 
1519 	switch (module->base_bdevs_constraint.type) {
1520 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1521 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1522 		break;
1523 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1524 		min_operational = module->base_bdevs_constraint.value;
1525 		break;
1526 	case CONSTRAINT_UNSET:
1527 		if (module->base_bdevs_constraint.value != 0) {
1528 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1529 				    (uint8_t)module->base_bdevs_constraint.value, name);
1530 			return -EINVAL;
1531 		}
1532 		min_operational = num_base_bdevs;
1533 		break;
1534 	default:
1535 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1536 			    (uint8_t)module->base_bdevs_constraint.type,
1537 			    raid_bdev_level_to_str(module->level));
1538 		return -EINVAL;
1539 	};
1540 
1541 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1542 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1543 			    raid_bdev_level_to_str(module->level));
1544 		return -EINVAL;
1545 	}
1546 
1547 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1548 	if (!raid_bdev) {
1549 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1550 		return -ENOMEM;
1551 	}
1552 
1553 	raid_bdev->module = module;
1554 	raid_bdev->num_base_bdevs = num_base_bdevs;
1555 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1556 					   sizeof(struct raid_base_bdev_info));
1557 	if (!raid_bdev->base_bdev_info) {
1558 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1559 		raid_bdev_free(raid_bdev);
1560 		return -ENOMEM;
1561 	}
1562 
1563 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1564 		base_info->raid_bdev = raid_bdev;
1565 	}
1566 
1567 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1568 	 * internally and set later.
1569 	 */
1570 	raid_bdev->strip_size = 0;
1571 	raid_bdev->strip_size_kb = strip_size;
1572 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1573 	raid_bdev->level = level;
1574 	raid_bdev->min_base_bdevs_operational = min_operational;
1575 	raid_bdev->superblock_enabled = superblock_enabled;
1576 
1577 	raid_bdev_gen = &raid_bdev->bdev;
1578 
1579 	raid_bdev_gen->name = strdup(name);
1580 	if (!raid_bdev_gen->name) {
1581 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1582 		raid_bdev_free(raid_bdev);
1583 		return -ENOMEM;
1584 	}
1585 
1586 	raid_bdev_gen->product_name = "Raid Volume";
1587 	raid_bdev_gen->ctxt = raid_bdev;
1588 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1589 	raid_bdev_gen->module = &g_raid_if;
1590 	raid_bdev_gen->write_cache = 0;
1591 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1592 
1593 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1594 
1595 	*raid_bdev_out = raid_bdev;
1596 
1597 	return 0;
1598 }
1599 
1600 /*
1601  * brief:
1602  * raid_bdev_create allocates raid bdev based on passed configuration
1603  * params:
1604  * name - name for raid bdev
1605  * strip_size - strip size in KB
1606  * num_base_bdevs - number of base bdevs
1607  * level - raid level
1608  * superblock_enabled - true if raid should have superblock
1609  * uuid - uuid to set for the bdev
1610  * raid_bdev_out - the created raid bdev
1611  * returns:
1612  * 0 - success
1613  * non zero - failure
1614  */
1615 int
1616 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1617 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1618 		 struct raid_bdev **raid_bdev_out)
1619 {
1620 	struct raid_bdev *raid_bdev;
1621 	int rc;
1622 
1623 	assert(uuid != NULL);
1624 
1625 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1626 			       &raid_bdev);
1627 	if (rc != 0) {
1628 		return rc;
1629 	}
1630 
1631 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1632 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1633 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1634 	}
1635 
1636 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1637 
1638 	*raid_bdev_out = raid_bdev;
1639 
1640 	return 0;
1641 }
1642 
1643 static void
1644 _raid_bdev_unregistering_cont(void *ctx)
1645 {
1646 	struct raid_bdev *raid_bdev = ctx;
1647 
1648 	spdk_bdev_close(raid_bdev->self_desc);
1649 	raid_bdev->self_desc = NULL;
1650 }
1651 
1652 static void
1653 raid_bdev_unregistering_cont(void *ctx)
1654 {
1655 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1656 }
1657 
1658 static int
1659 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1660 {
1661 	struct raid_process_finish_action *finish_action;
1662 
1663 	assert(spdk_get_thread() == process->thread);
1664 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1665 
1666 	finish_action = calloc(1, sizeof(*finish_action));
1667 	if (finish_action == NULL) {
1668 		return -ENOMEM;
1669 	}
1670 
1671 	finish_action->cb = cb;
1672 	finish_action->cb_ctx = cb_ctx;
1673 
1674 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1675 
1676 	return 0;
1677 }
1678 
1679 static void
1680 raid_bdev_unregistering_stop_process(void *ctx)
1681 {
1682 	struct raid_bdev_process *process = ctx;
1683 	struct raid_bdev *raid_bdev = process->raid_bdev;
1684 	int rc;
1685 
1686 	process->state = RAID_PROCESS_STATE_STOPPING;
1687 	if (process->status == 0) {
1688 		process->status = -ECANCELED;
1689 	}
1690 
1691 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1692 	if (rc != 0) {
1693 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1694 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1695 	}
1696 }
1697 
1698 static void
1699 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1700 {
1701 	struct raid_bdev *raid_bdev = event_ctx;
1702 
1703 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1704 		if (raid_bdev->process != NULL) {
1705 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1706 					     raid_bdev->process);
1707 		} else {
1708 			raid_bdev_unregistering_cont(raid_bdev);
1709 		}
1710 	}
1711 }
1712 
1713 static void
1714 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1715 {
1716 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1717 	int rc;
1718 
1719 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1720 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1721 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1722 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1723 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1724 				sizeof(struct raid_bdev_io_channel),
1725 				raid_bdev_gen->name);
1726 	rc = spdk_bdev_register(raid_bdev_gen);
1727 	if (rc != 0) {
1728 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1729 			    raid_bdev_gen->name, spdk_strerror(-rc));
1730 		goto err;
1731 	}
1732 
1733 	/*
1734 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1735 	 * first. The process may still need to unquiesce a range but it will fail because the
1736 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1737 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1738 	 * so this is the only way currently to do this correctly.
1739 	 * TODO: try to handle this correctly in bdev layer instead.
1740 	 */
1741 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1742 				&raid_bdev->self_desc);
1743 	if (rc != 0) {
1744 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1745 			    raid_bdev_gen->name, spdk_strerror(-rc));
1746 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1747 		goto err;
1748 	}
1749 
1750 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1751 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1752 		      raid_bdev_gen->name, raid_bdev);
1753 	return;
1754 err:
1755 	if (raid_bdev->module->stop != NULL) {
1756 		raid_bdev->module->stop(raid_bdev);
1757 	}
1758 	spdk_io_device_unregister(raid_bdev, NULL);
1759 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1760 }
1761 
1762 static void
1763 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1764 {
1765 	if (status == 0) {
1766 		raid_bdev_configure_cont(raid_bdev);
1767 	} else {
1768 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1769 			    raid_bdev->bdev.name, spdk_strerror(-status));
1770 		if (raid_bdev->module->stop != NULL) {
1771 			raid_bdev->module->stop(raid_bdev);
1772 		}
1773 	}
1774 }
1775 
1776 /*
1777  * brief:
1778  * If raid bdev config is complete, then only register the raid bdev to
1779  * bdev layer and remove this raid bdev from configuring list and
1780  * insert the raid bdev to configured list
1781  * params:
1782  * raid_bdev - pointer to raid bdev
1783  * returns:
1784  * 0 - success
1785  * non zero - failure
1786  */
1787 static int
1788 raid_bdev_configure(struct raid_bdev *raid_bdev)
1789 {
1790 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1791 	int rc;
1792 
1793 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1794 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1795 	assert(raid_bdev->bdev.blocklen > 0);
1796 
1797 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1798 	 * internal use.
1799 	 */
1800 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1801 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1802 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1803 		return -EINVAL;
1804 	}
1805 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1806 
1807 	rc = raid_bdev->module->start(raid_bdev);
1808 	if (rc != 0) {
1809 		SPDK_ERRLOG("raid module startup callback failed\n");
1810 		return rc;
1811 	}
1812 
1813 	if (raid_bdev->superblock_enabled) {
1814 		if (raid_bdev->sb == NULL) {
1815 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1816 			if (rc == 0) {
1817 				raid_bdev_init_superblock(raid_bdev);
1818 			}
1819 		} else {
1820 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1821 			if (raid_bdev->sb->block_size != data_block_size) {
1822 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1823 				rc = -EINVAL;
1824 			}
1825 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1826 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1827 				rc = -EINVAL;
1828 			}
1829 		}
1830 
1831 		if (rc != 0) {
1832 			if (raid_bdev->module->stop != NULL) {
1833 				raid_bdev->module->stop(raid_bdev);
1834 			}
1835 			return rc;
1836 		}
1837 
1838 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1839 	} else {
1840 		raid_bdev_configure_cont(raid_bdev);
1841 	}
1842 
1843 	return 0;
1844 }
1845 
1846 /*
1847  * brief:
1848  * If raid bdev is online and registered, change the bdev state to
1849  * configuring and unregister this raid device. Queue this raid device
1850  * in configuring list
1851  * params:
1852  * raid_bdev - pointer to raid bdev
1853  * cb_fn - callback function
1854  * cb_arg - argument to callback function
1855  * returns:
1856  * none
1857  */
1858 static void
1859 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1860 		      void *cb_arg)
1861 {
1862 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1863 		if (cb_fn) {
1864 			cb_fn(cb_arg, 0);
1865 		}
1866 		return;
1867 	}
1868 
1869 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1870 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1871 
1872 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1873 }
1874 
1875 /*
1876  * brief:
1877  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1878  * params:
1879  * base_bdev - pointer to base bdev
1880  * returns:
1881  * base bdev info if found, otherwise NULL.
1882  */
1883 static struct raid_base_bdev_info *
1884 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1885 {
1886 	struct raid_bdev *raid_bdev;
1887 	struct raid_base_bdev_info *base_info;
1888 
1889 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1890 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1891 			if (base_info->desc != NULL &&
1892 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1893 				return base_info;
1894 			}
1895 		}
1896 	}
1897 
1898 	return NULL;
1899 }
1900 
1901 static void
1902 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1903 {
1904 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1905 
1906 	assert(base_info->remove_scheduled);
1907 	base_info->remove_scheduled = false;
1908 
1909 	if (status == 0) {
1910 		raid_bdev->num_base_bdevs_operational--;
1911 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1912 			/* There is not enough base bdevs to keep the raid bdev operational. */
1913 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1914 			return;
1915 		}
1916 	}
1917 
1918 	if (base_info->remove_cb != NULL) {
1919 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1920 	}
1921 }
1922 
1923 static void
1924 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1925 {
1926 	struct raid_base_bdev_info *base_info = ctx;
1927 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1928 
1929 	if (status != 0) {
1930 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1931 			    raid_bdev->bdev.name, spdk_strerror(-status));
1932 	}
1933 
1934 	raid_bdev_remove_base_bdev_done(base_info, status);
1935 }
1936 
1937 static void
1938 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1939 {
1940 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1941 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1942 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1943 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1944 
1945 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1946 
1947 	if (raid_ch->base_channel[idx] != NULL) {
1948 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1949 		raid_ch->base_channel[idx] = NULL;
1950 	}
1951 
1952 	if (raid_ch->process.ch_processed != NULL) {
1953 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1954 	}
1955 
1956 	spdk_for_each_channel_continue(i, 0);
1957 }
1958 
1959 static void
1960 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1961 {
1962 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1963 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1964 
1965 	raid_bdev_free_base_bdev_resource(base_info);
1966 
1967 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1968 			    base_info);
1969 }
1970 
1971 static void
1972 raid_bdev_remove_base_bdev_cont(struct raid_base_bdev_info *base_info)
1973 {
1974 	raid_bdev_deconfigure_base_bdev(base_info);
1975 
1976 	spdk_for_each_channel(base_info->raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1977 			      raid_bdev_channels_remove_base_bdev_done);
1978 }
1979 
1980 static void
1981 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1982 {
1983 	struct raid_base_bdev_info *base_info = ctx;
1984 
1985 	if (status != 0) {
1986 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1987 			    raid_bdev->bdev.name, spdk_strerror(-status));
1988 		raid_bdev_remove_base_bdev_done(base_info, status);
1989 		return;
1990 	}
1991 
1992 	raid_bdev_remove_base_bdev_cont(base_info);
1993 }
1994 
1995 static void
1996 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1997 {
1998 	struct raid_base_bdev_info *base_info = ctx;
1999 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2000 
2001 	if (status != 0) {
2002 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2003 			    raid_bdev->bdev.name, spdk_strerror(-status));
2004 		raid_bdev_remove_base_bdev_done(base_info, status);
2005 		return;
2006 	}
2007 
2008 	if (raid_bdev->sb) {
2009 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2010 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
2011 		uint8_t i;
2012 
2013 		for (i = 0; i < sb->base_bdevs_size; i++) {
2014 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2015 
2016 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
2017 			    sb_base_bdev->slot == slot) {
2018 				if (base_info->is_failed) {
2019 					sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
2020 				} else {
2021 					sb_base_bdev->state = RAID_SB_BASE_BDEV_MISSING;
2022 				}
2023 
2024 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
2025 				return;
2026 			}
2027 		}
2028 	}
2029 
2030 	raid_bdev_remove_base_bdev_cont(base_info);
2031 }
2032 
2033 static int
2034 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2035 {
2036 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2037 
2038 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2039 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2040 }
2041 
2042 struct raid_bdev_process_base_bdev_remove_ctx {
2043 	struct raid_bdev_process *process;
2044 	struct raid_base_bdev_info *base_info;
2045 	uint8_t num_base_bdevs_operational;
2046 };
2047 
2048 static void
2049 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2050 {
2051 	struct raid_base_bdev_info *base_info = ctx;
2052 	int ret;
2053 
2054 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2055 	if (ret != 0) {
2056 		raid_bdev_remove_base_bdev_done(base_info, ret);
2057 	}
2058 }
2059 
2060 static void
2061 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2062 {
2063 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2064 	struct raid_base_bdev_info *base_info = ctx->base_info;
2065 
2066 	free(ctx);
2067 
2068 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2069 			     base_info);
2070 }
2071 
2072 static void
2073 _raid_bdev_process_base_bdev_remove(void *_ctx)
2074 {
2075 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2076 	struct raid_bdev_process *process = ctx->process;
2077 	int ret;
2078 
2079 	if (ctx->base_info != process->target &&
2080 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2081 		/* process doesn't need to be stopped */
2082 		raid_bdev_process_base_bdev_remove_cont(ctx);
2083 		return;
2084 	}
2085 
2086 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2087 	       process->state < RAID_PROCESS_STATE_STOPPED);
2088 
2089 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2090 	if (ret != 0) {
2091 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2092 		free(ctx);
2093 		return;
2094 	}
2095 
2096 	process->state = RAID_PROCESS_STATE_STOPPING;
2097 
2098 	if (process->status == 0) {
2099 		process->status = -ENODEV;
2100 	}
2101 }
2102 
2103 static int
2104 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2105 				   struct raid_base_bdev_info *base_info)
2106 {
2107 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2108 
2109 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2110 
2111 	ctx = calloc(1, sizeof(*ctx));
2112 	if (ctx == NULL) {
2113 		return -ENOMEM;
2114 	}
2115 
2116 	/*
2117 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2118 	 * because the process thread should not access raid_bdev's properties. Particularly,
2119 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2120 	 * will still be valid until the process is fully stopped.
2121 	 */
2122 	ctx->base_info = base_info;
2123 	ctx->process = process;
2124 	/*
2125 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2126 	 * after the removal and more than one base bdev may be removed at the same time
2127 	 */
2128 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2129 		if (base_info->is_configured && !base_info->remove_scheduled) {
2130 			ctx->num_base_bdevs_operational++;
2131 		}
2132 	}
2133 
2134 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2135 
2136 	return 0;
2137 }
2138 
2139 static int
2140 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2141 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2142 {
2143 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2144 	int ret = 0;
2145 
2146 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2147 
2148 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2149 
2150 	if (base_info->remove_scheduled || !base_info->is_configured) {
2151 		return -ENODEV;
2152 	}
2153 
2154 	assert(base_info->desc);
2155 	base_info->remove_scheduled = true;
2156 
2157 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2158 		/*
2159 		 * As raid bdev is not registered yet or already unregistered,
2160 		 * so cleanup should be done here itself.
2161 		 *
2162 		 * Removing a base bdev at this stage does not change the number of operational
2163 		 * base bdevs, only the number of discovered base bdevs.
2164 		 */
2165 		raid_bdev_free_base_bdev_resource(base_info);
2166 		base_info->remove_scheduled = false;
2167 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2168 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2169 			/* There is no base bdev for this raid, so free the raid device. */
2170 			raid_bdev_cleanup_and_free(raid_bdev);
2171 		}
2172 		if (cb_fn != NULL) {
2173 			cb_fn(cb_ctx, 0);
2174 		}
2175 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2176 		/* This raid bdev does not tolerate removing a base bdev. */
2177 		raid_bdev->num_base_bdevs_operational--;
2178 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2179 	} else {
2180 		base_info->remove_cb = cb_fn;
2181 		base_info->remove_cb_ctx = cb_ctx;
2182 
2183 		if (raid_bdev->process != NULL) {
2184 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2185 		} else {
2186 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2187 		}
2188 
2189 		if (ret != 0) {
2190 			base_info->remove_scheduled = false;
2191 		}
2192 	}
2193 
2194 	return ret;
2195 }
2196 
2197 /*
2198  * brief:
2199  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2200  * is removed. This function checks if this base bdev is part of any raid bdev
2201  * or not. If yes, it takes necessary action on that particular raid bdev.
2202  * params:
2203  * base_bdev - pointer to base bdev which got removed
2204  * cb_fn - callback function
2205  * cb_arg - argument to callback function
2206  * returns:
2207  * 0 - success
2208  * non zero - failure
2209  */
2210 int
2211 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2212 {
2213 	struct raid_base_bdev_info *base_info;
2214 
2215 	/* Find the raid_bdev which has claimed this base_bdev */
2216 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2217 	if (!base_info) {
2218 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2219 		return -ENODEV;
2220 	}
2221 
2222 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2223 }
2224 
2225 static void
2226 raid_bdev_fail_base_remove_cb(void *ctx, int status)
2227 {
2228 	struct raid_base_bdev_info *base_info = ctx;
2229 
2230 	if (status != 0) {
2231 		SPDK_WARNLOG("Failed to remove base bdev %s\n", base_info->name);
2232 		base_info->is_failed = false;
2233 	}
2234 }
2235 
2236 static void
2237 _raid_bdev_fail_base_bdev(void *ctx)
2238 {
2239 	struct raid_base_bdev_info *base_info = ctx;
2240 	int rc;
2241 
2242 	if (base_info->is_failed) {
2243 		return;
2244 	}
2245 	base_info->is_failed = true;
2246 
2247 	SPDK_NOTICELOG("Failing base bdev in slot %d ('%s') of raid bdev '%s'\n",
2248 		       raid_bdev_base_bdev_slot(base_info), base_info->name, base_info->raid_bdev->bdev.name);
2249 
2250 	rc = _raid_bdev_remove_base_bdev(base_info, raid_bdev_fail_base_remove_cb, base_info);
2251 	if (rc != 0) {
2252 		raid_bdev_fail_base_remove_cb(base_info, rc);
2253 	}
2254 }
2255 
2256 void
2257 raid_bdev_fail_base_bdev(struct raid_base_bdev_info *base_info)
2258 {
2259 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_fail_base_bdev, base_info);
2260 }
2261 
2262 static void
2263 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2264 {
2265 	if (status != 0) {
2266 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2267 			    raid_bdev->bdev.name, spdk_strerror(-status));
2268 	}
2269 }
2270 
2271 /*
2272  * brief:
2273  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2274  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2275  * If yes, call module handler to resize the raid_bdev if implemented.
2276  * params:
2277  * base_bdev - pointer to base bdev which got resized.
2278  * returns:
2279  * none
2280  */
2281 static void
2282 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2283 {
2284 	struct raid_bdev *raid_bdev;
2285 	struct raid_base_bdev_info *base_info;
2286 	uint64_t blockcnt_old;
2287 
2288 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2289 
2290 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2291 
2292 	/* Find the raid_bdev which has claimed this base_bdev */
2293 	if (!base_info) {
2294 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2295 		return;
2296 	}
2297 	raid_bdev = base_info->raid_bdev;
2298 
2299 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2300 
2301 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2302 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2303 
2304 	base_info->blockcnt = base_bdev->blockcnt;
2305 
2306 	if (!raid_bdev->module->resize) {
2307 		return;
2308 	}
2309 
2310 	blockcnt_old = raid_bdev->bdev.blockcnt;
2311 	if (raid_bdev->module->resize(raid_bdev) == false) {
2312 		return;
2313 	}
2314 
2315 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2316 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2317 
2318 	if (raid_bdev->superblock_enabled) {
2319 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2320 		uint8_t i;
2321 
2322 		for (i = 0; i < sb->base_bdevs_size; i++) {
2323 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2324 
2325 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2326 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2327 				sb_base_bdev->data_size = base_info->data_size;
2328 			}
2329 		}
2330 		sb->raid_size = raid_bdev->bdev.blockcnt;
2331 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2332 	}
2333 }
2334 
2335 /*
2336  * brief:
2337  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2338  * triggers asynchronous event.
2339  * params:
2340  * type - event details.
2341  * bdev - bdev that triggered event.
2342  * event_ctx - context for event.
2343  * returns:
2344  * none
2345  */
2346 static void
2347 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2348 			  void *event_ctx)
2349 {
2350 	int rc;
2351 
2352 	switch (type) {
2353 	case SPDK_BDEV_EVENT_REMOVE:
2354 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2355 		if (rc != 0) {
2356 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2357 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2358 		}
2359 		break;
2360 	case SPDK_BDEV_EVENT_RESIZE:
2361 		raid_bdev_resize_base_bdev(bdev);
2362 		break;
2363 	default:
2364 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2365 		break;
2366 	}
2367 }
2368 
2369 /*
2370  * brief:
2371  * Deletes the specified raid bdev
2372  * params:
2373  * raid_bdev - pointer to raid bdev
2374  * cb_fn - callback function
2375  * cb_arg - argument to callback function
2376  */
2377 void
2378 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2379 {
2380 	struct raid_base_bdev_info *base_info;
2381 
2382 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2383 
2384 	if (raid_bdev->destroy_started) {
2385 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2386 			      raid_bdev->bdev.name);
2387 		if (cb_fn) {
2388 			cb_fn(cb_arg, -EALREADY);
2389 		}
2390 		return;
2391 	}
2392 
2393 	raid_bdev->destroy_started = true;
2394 
2395 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2396 		base_info->remove_scheduled = true;
2397 
2398 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2399 			/*
2400 			 * As raid bdev is not registered yet or already unregistered,
2401 			 * so cleanup should be done here itself.
2402 			 */
2403 			raid_bdev_free_base_bdev_resource(base_info);
2404 		}
2405 	}
2406 
2407 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2408 		/* There is no base bdev for this raid, so free the raid device. */
2409 		raid_bdev_cleanup_and_free(raid_bdev);
2410 		if (cb_fn) {
2411 			cb_fn(cb_arg, 0);
2412 		}
2413 	} else {
2414 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2415 	}
2416 }
2417 
2418 static void
2419 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2420 {
2421 	if (status != 0) {
2422 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2423 			    raid_bdev->bdev.name, spdk_strerror(-status));
2424 	}
2425 }
2426 
2427 static void
2428 raid_bdev_process_finish_write_sb(void *ctx)
2429 {
2430 	struct raid_bdev *raid_bdev = ctx;
2431 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2432 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2433 	struct raid_base_bdev_info *base_info;
2434 	uint8_t i;
2435 
2436 	for (i = 0; i < sb->base_bdevs_size; i++) {
2437 		sb_base_bdev = &sb->base_bdevs[i];
2438 
2439 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2440 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2441 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2442 			if (base_info->is_configured) {
2443 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2444 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2445 			}
2446 		}
2447 	}
2448 
2449 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2450 }
2451 
2452 static void raid_bdev_process_free(struct raid_bdev_process *process);
2453 
2454 static void
2455 _raid_bdev_process_finish_done(void *ctx)
2456 {
2457 	struct raid_bdev_process *process = ctx;
2458 	struct raid_process_finish_action *finish_action;
2459 
2460 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2461 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2462 		finish_action->cb(finish_action->cb_ctx);
2463 		free(finish_action);
2464 	}
2465 
2466 	spdk_poller_unregister(&process->qos.process_continue_poller);
2467 
2468 	raid_bdev_process_free(process);
2469 
2470 	spdk_thread_exit(spdk_get_thread());
2471 }
2472 
2473 static void
2474 raid_bdev_process_finish_target_removed(void *ctx, int status)
2475 {
2476 	struct raid_bdev_process *process = ctx;
2477 
2478 	if (status != 0) {
2479 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2480 	}
2481 
2482 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2483 }
2484 
2485 static void
2486 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2487 {
2488 	struct raid_bdev_process *process = ctx;
2489 
2490 	if (status != 0) {
2491 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2492 	}
2493 
2494 	if (process->status != 0) {
2495 		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2496 						     process);
2497 		if (status != 0) {
2498 			raid_bdev_process_finish_target_removed(process, status);
2499 		}
2500 		return;
2501 	}
2502 
2503 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2504 }
2505 
2506 static void
2507 raid_bdev_process_finish_unquiesce(void *ctx)
2508 {
2509 	struct raid_bdev_process *process = ctx;
2510 	int rc;
2511 
2512 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2513 				 raid_bdev_process_finish_unquiesced, process);
2514 	if (rc != 0) {
2515 		raid_bdev_process_finish_unquiesced(process, rc);
2516 	}
2517 }
2518 
2519 static void
2520 raid_bdev_process_finish_done(void *ctx)
2521 {
2522 	struct raid_bdev_process *process = ctx;
2523 	struct raid_bdev *raid_bdev = process->raid_bdev;
2524 
2525 	if (process->raid_ch != NULL) {
2526 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2527 	}
2528 
2529 	process->state = RAID_PROCESS_STATE_STOPPED;
2530 
2531 	if (process->status == 0) {
2532 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2533 			       raid_bdev_process_to_str(process->type),
2534 			       raid_bdev->bdev.name);
2535 		if (raid_bdev->superblock_enabled) {
2536 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2537 					     raid_bdev_process_finish_write_sb,
2538 					     raid_bdev);
2539 		}
2540 	} else {
2541 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2542 			     raid_bdev_process_to_str(process->type),
2543 			     raid_bdev->bdev.name,
2544 			     spdk_strerror(-process->status));
2545 	}
2546 
2547 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2548 			     process);
2549 }
2550 
2551 static void
2552 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2553 {
2554 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2555 
2556 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2557 }
2558 
2559 static void
2560 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2561 {
2562 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2563 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2564 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2565 
2566 	if (process->status == 0) {
2567 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2568 
2569 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2570 		raid_ch->process.target_ch = NULL;
2571 	}
2572 
2573 	raid_bdev_ch_process_cleanup(raid_ch);
2574 
2575 	spdk_for_each_channel_continue(i, 0);
2576 }
2577 
2578 static void
2579 raid_bdev_process_finish_quiesced(void *ctx, int status)
2580 {
2581 	struct raid_bdev_process *process = ctx;
2582 	struct raid_bdev *raid_bdev = process->raid_bdev;
2583 
2584 	if (status != 0) {
2585 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2586 		return;
2587 	}
2588 
2589 	raid_bdev->process = NULL;
2590 	process->target->is_process_target = false;
2591 
2592 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2593 			      __raid_bdev_process_finish);
2594 }
2595 
2596 static void
2597 _raid_bdev_process_finish(void *ctx)
2598 {
2599 	struct raid_bdev_process *process = ctx;
2600 	int rc;
2601 
2602 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2603 			       raid_bdev_process_finish_quiesced, process);
2604 	if (rc != 0) {
2605 		raid_bdev_process_finish_quiesced(ctx, rc);
2606 	}
2607 }
2608 
2609 static void
2610 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2611 {
2612 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2613 }
2614 
2615 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2616 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2617 
2618 static void
2619 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2620 {
2621 	assert(spdk_get_thread() == process->thread);
2622 
2623 	if (process->status == 0) {
2624 		process->status = status;
2625 	}
2626 
2627 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2628 		return;
2629 	}
2630 
2631 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2632 	process->state = RAID_PROCESS_STATE_STOPPING;
2633 
2634 	if (process->window_range_locked) {
2635 		raid_bdev_process_unlock_window_range(process);
2636 	} else {
2637 		raid_bdev_process_thread_run(process);
2638 	}
2639 }
2640 
2641 static void
2642 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2643 {
2644 	struct raid_bdev_process *process = ctx;
2645 
2646 	if (status != 0) {
2647 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2648 		raid_bdev_process_finish(process, status);
2649 		return;
2650 	}
2651 
2652 	process->window_range_locked = false;
2653 	process->window_offset += process->window_size;
2654 
2655 	raid_bdev_process_thread_run(process);
2656 }
2657 
2658 static void
2659 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2660 {
2661 	int rc;
2662 
2663 	assert(process->window_range_locked == true);
2664 
2665 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2666 				       process->window_offset, process->max_window_size,
2667 				       raid_bdev_process_window_range_unlocked, process);
2668 	if (rc != 0) {
2669 		raid_bdev_process_window_range_unlocked(process, rc);
2670 	}
2671 }
2672 
2673 static void
2674 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2675 {
2676 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2677 
2678 	raid_bdev_process_unlock_window_range(process);
2679 }
2680 
2681 static void
2682 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2683 {
2684 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2685 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2686 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2687 
2688 	raid_ch->process.offset = process->window_offset + process->window_size;
2689 
2690 	spdk_for_each_channel_continue(i, 0);
2691 }
2692 
2693 void
2694 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2695 {
2696 	struct raid_bdev_process *process = process_req->process;
2697 
2698 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2699 
2700 	assert(spdk_get_thread() == process->thread);
2701 	assert(process->window_remaining >= process_req->num_blocks);
2702 
2703 	if (status != 0) {
2704 		process->window_status = status;
2705 	}
2706 
2707 	process->window_remaining -= process_req->num_blocks;
2708 	if (process->window_remaining == 0) {
2709 		if (process->window_status != 0) {
2710 			raid_bdev_process_finish(process, process->window_status);
2711 			return;
2712 		}
2713 
2714 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2715 				      raid_bdev_process_channels_update_done);
2716 	}
2717 }
2718 
2719 static int
2720 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2721 				 uint32_t num_blocks)
2722 {
2723 	struct raid_bdev *raid_bdev = process->raid_bdev;
2724 	struct raid_bdev_process_request *process_req;
2725 	int ret;
2726 
2727 	process_req = TAILQ_FIRST(&process->requests);
2728 	if (process_req == NULL) {
2729 		assert(process->window_remaining > 0);
2730 		return 0;
2731 	}
2732 
2733 	process_req->target = process->target;
2734 	process_req->target_ch = process->raid_ch->process.target_ch;
2735 	process_req->offset_blocks = offset_blocks;
2736 	process_req->num_blocks = num_blocks;
2737 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2738 
2739 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2740 	if (ret <= 0) {
2741 		if (ret < 0) {
2742 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2743 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2744 			process->window_status = ret;
2745 		}
2746 		return ret;
2747 	}
2748 
2749 	process_req->num_blocks = ret;
2750 	TAILQ_REMOVE(&process->requests, process_req, link);
2751 
2752 	return ret;
2753 }
2754 
2755 static void
2756 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2757 {
2758 	struct raid_bdev *raid_bdev = process->raid_bdev;
2759 	uint64_t offset = process->window_offset;
2760 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2761 	int ret;
2762 
2763 	while (offset < offset_end) {
2764 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2765 		if (ret <= 0) {
2766 			break;
2767 		}
2768 
2769 		process->window_remaining += ret;
2770 		offset += ret;
2771 	}
2772 
2773 	if (process->window_remaining > 0) {
2774 		process->window_size = process->window_remaining;
2775 	} else {
2776 		raid_bdev_process_finish(process, process->window_status);
2777 	}
2778 }
2779 
2780 static void
2781 raid_bdev_process_window_range_locked(void *ctx, int status)
2782 {
2783 	struct raid_bdev_process *process = ctx;
2784 
2785 	if (status != 0) {
2786 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2787 		raid_bdev_process_finish(process, status);
2788 		return;
2789 	}
2790 
2791 	process->window_range_locked = true;
2792 
2793 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2794 		raid_bdev_process_unlock_window_range(process);
2795 		return;
2796 	}
2797 
2798 	_raid_bdev_process_thread_run(process);
2799 }
2800 
2801 static bool
2802 raid_bdev_process_consume_token(struct raid_bdev_process *process)
2803 {
2804 	struct raid_bdev *raid_bdev = process->raid_bdev;
2805 	uint64_t now = spdk_get_ticks();
2806 
2807 	process->qos.bytes_available = spdk_min(process->qos.bytes_max,
2808 						process->qos.bytes_available +
2809 						(now - process->qos.last_tsc) * process->qos.bytes_per_tsc);
2810 	process->qos.last_tsc = now;
2811 	if (process->qos.bytes_available > 0.0) {
2812 		process->qos.bytes_available -= process->window_size * raid_bdev->bdev.blocklen;
2813 		return true;
2814 	}
2815 	return false;
2816 }
2817 
2818 static bool
2819 raid_bdev_process_lock_window_range(struct raid_bdev_process *process)
2820 {
2821 	struct raid_bdev *raid_bdev = process->raid_bdev;
2822 	int rc;
2823 
2824 	assert(process->window_range_locked == false);
2825 
2826 	if (process->qos.enable_qos) {
2827 		if (raid_bdev_process_consume_token(process)) {
2828 			spdk_poller_pause(process->qos.process_continue_poller);
2829 		} else {
2830 			spdk_poller_resume(process->qos.process_continue_poller);
2831 			return false;
2832 		}
2833 	}
2834 
2835 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2836 				     process->window_offset, process->max_window_size,
2837 				     raid_bdev_process_window_range_locked, process);
2838 	if (rc != 0) {
2839 		raid_bdev_process_window_range_locked(process, rc);
2840 	}
2841 	return true;
2842 }
2843 
2844 static int
2845 raid_bdev_process_continue_poll(void *arg)
2846 {
2847 	struct raid_bdev_process *process = arg;
2848 
2849 	if (raid_bdev_process_lock_window_range(process)) {
2850 		return SPDK_POLLER_BUSY;
2851 	}
2852 	return SPDK_POLLER_IDLE;
2853 }
2854 
2855 static void
2856 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2857 {
2858 	struct raid_bdev *raid_bdev = process->raid_bdev;
2859 
2860 	assert(spdk_get_thread() == process->thread);
2861 	assert(process->window_remaining == 0);
2862 	assert(process->window_range_locked == false);
2863 
2864 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2865 		raid_bdev_process_do_finish(process);
2866 		return;
2867 	}
2868 
2869 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2870 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2871 		raid_bdev_process_finish(process, 0);
2872 		return;
2873 	}
2874 
2875 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2876 					    process->max_window_size);
2877 	raid_bdev_process_lock_window_range(process);
2878 }
2879 
2880 static void
2881 raid_bdev_process_thread_init(void *ctx)
2882 {
2883 	struct raid_bdev_process *process = ctx;
2884 	struct raid_bdev *raid_bdev = process->raid_bdev;
2885 	struct spdk_io_channel *ch;
2886 
2887 	process->thread = spdk_get_thread();
2888 
2889 	ch = spdk_get_io_channel(raid_bdev);
2890 	if (ch == NULL) {
2891 		process->status = -ENOMEM;
2892 		raid_bdev_process_do_finish(process);
2893 		return;
2894 	}
2895 
2896 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2897 	process->state = RAID_PROCESS_STATE_RUNNING;
2898 
2899 	if (process->qos.enable_qos) {
2900 		process->qos.process_continue_poller = SPDK_POLLER_REGISTER(raid_bdev_process_continue_poll,
2901 						       process, 0);
2902 		spdk_poller_pause(process->qos.process_continue_poller);
2903 	}
2904 
2905 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2906 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2907 
2908 	raid_bdev_process_thread_run(process);
2909 }
2910 
2911 static void
2912 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2913 {
2914 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2915 
2916 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2917 	raid_bdev_process_free(process);
2918 
2919 	/* TODO: update sb */
2920 }
2921 
2922 static void
2923 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2924 {
2925 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2926 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2927 
2928 	raid_bdev_ch_process_cleanup(raid_ch);
2929 
2930 	spdk_for_each_channel_continue(i, 0);
2931 }
2932 
2933 static void
2934 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2935 {
2936 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2937 	struct raid_bdev *raid_bdev = process->raid_bdev;
2938 	struct spdk_thread *thread;
2939 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2940 
2941 	if (status == 0 &&
2942 	    (process->target->remove_scheduled || !process->target->is_configured ||
2943 	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2944 		/* a base bdev was removed before we got here */
2945 		status = -ENODEV;
2946 	}
2947 
2948 	if (status != 0) {
2949 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2950 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2951 			    spdk_strerror(-status));
2952 		goto err;
2953 	}
2954 
2955 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2956 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2957 
2958 	thread = spdk_thread_create(thread_name, NULL);
2959 	if (thread == NULL) {
2960 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2961 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2962 		goto err;
2963 	}
2964 
2965 	raid_bdev->process = process;
2966 
2967 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2968 
2969 	return;
2970 err:
2971 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2972 			      raid_bdev_channels_abort_start_process_done);
2973 }
2974 
2975 static void
2976 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2977 {
2978 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2979 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2980 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2981 	int rc;
2982 
2983 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2984 
2985 	spdk_for_each_channel_continue(i, rc);
2986 }
2987 
2988 static void
2989 raid_bdev_process_start(struct raid_bdev_process *process)
2990 {
2991 	struct raid_bdev *raid_bdev = process->raid_bdev;
2992 
2993 	assert(raid_bdev->module->submit_process_request != NULL);
2994 
2995 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2996 			      raid_bdev_channels_start_process_done);
2997 }
2998 
2999 static void
3000 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
3001 {
3002 	spdk_dma_free(process_req->iov.iov_base);
3003 	spdk_dma_free(process_req->md_buf);
3004 	free(process_req);
3005 }
3006 
3007 static struct raid_bdev_process_request *
3008 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
3009 {
3010 	struct raid_bdev *raid_bdev = process->raid_bdev;
3011 	struct raid_bdev_process_request *process_req;
3012 
3013 	process_req = calloc(1, sizeof(*process_req));
3014 	if (process_req == NULL) {
3015 		return NULL;
3016 	}
3017 
3018 	process_req->process = process;
3019 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
3020 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
3021 	if (process_req->iov.iov_base == NULL) {
3022 		free(process_req);
3023 		return NULL;
3024 	}
3025 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
3026 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
3027 		if (process_req->md_buf == NULL) {
3028 			raid_bdev_process_request_free(process_req);
3029 			return NULL;
3030 		}
3031 	}
3032 
3033 	return process_req;
3034 }
3035 
3036 static void
3037 raid_bdev_process_free(struct raid_bdev_process *process)
3038 {
3039 	struct raid_bdev_process_request *process_req;
3040 
3041 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
3042 		TAILQ_REMOVE(&process->requests, process_req, link);
3043 		raid_bdev_process_request_free(process_req);
3044 	}
3045 
3046 	free(process);
3047 }
3048 
3049 static struct raid_bdev_process *
3050 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
3051 			struct raid_base_bdev_info *target)
3052 {
3053 	struct raid_bdev_process *process;
3054 	struct raid_bdev_process_request *process_req;
3055 	int i;
3056 
3057 	process = calloc(1, sizeof(*process));
3058 	if (process == NULL) {
3059 		return NULL;
3060 	}
3061 
3062 	process->raid_bdev = raid_bdev;
3063 	process->type = type;
3064 	process->target = target;
3065 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
3066 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
3067 					    raid_bdev->bdev.write_unit_size);
3068 	TAILQ_INIT(&process->requests);
3069 	TAILQ_INIT(&process->finish_actions);
3070 
3071 	if (g_opts.process_max_bandwidth_mb_sec != 0) {
3072 		process->qos.enable_qos = true;
3073 		process->qos.last_tsc = spdk_get_ticks();
3074 		process->qos.bytes_per_tsc = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 /
3075 					     spdk_get_ticks_hz();
3076 		process->qos.bytes_max = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 / SPDK_SEC_TO_MSEC;
3077 		process->qos.bytes_available = 0.0;
3078 	}
3079 
3080 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
3081 		process_req = raid_bdev_process_alloc_request(process);
3082 		if (process_req == NULL) {
3083 			raid_bdev_process_free(process);
3084 			return NULL;
3085 		}
3086 
3087 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
3088 	}
3089 
3090 	return process;
3091 }
3092 
3093 static int
3094 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
3095 {
3096 	struct raid_bdev_process *process;
3097 
3098 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3099 
3100 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
3101 	if (process == NULL) {
3102 		return -ENOMEM;
3103 	}
3104 
3105 	raid_bdev_process_start(process);
3106 
3107 	return 0;
3108 }
3109 
3110 static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
3111 
3112 static void
3113 _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
3114 {
3115 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
3116 
3117 	raid_bdev_configure_base_bdev_cont(base_info);
3118 }
3119 
3120 static void
3121 raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3122 {
3123 	spdk_for_each_channel_continue(i, 0);
3124 }
3125 
3126 static void
3127 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3128 {
3129 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3130 	int rc;
3131 
3132 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3133 	    base_info->is_process_target == false) {
3134 		/* TODO: defer if rebuild in progress on another base bdev */
3135 		assert(raid_bdev->process == NULL);
3136 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3137 		base_info->is_process_target = true;
3138 		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3139 		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3140 		return;
3141 	}
3142 
3143 	base_info->is_configured = true;
3144 
3145 	raid_bdev->num_base_bdevs_discovered++;
3146 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3147 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3148 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3149 
3150 	/*
3151 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3152 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3153 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3154 	 * degraded.
3155 	 */
3156 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3157 		rc = raid_bdev_configure(raid_bdev);
3158 		if (rc != 0) {
3159 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3160 		}
3161 	} else if (base_info->is_process_target) {
3162 		raid_bdev->num_base_bdevs_operational++;
3163 		rc = raid_bdev_start_rebuild(base_info);
3164 		if (rc != 0) {
3165 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3166 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3167 		}
3168 	} else {
3169 		rc = 0;
3170 	}
3171 
3172 	if (base_info->configure_cb != NULL) {
3173 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
3174 	}
3175 }
3176 
3177 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3178 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3179 
3180 static void
3181 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3182 		void *ctx)
3183 {
3184 	struct raid_base_bdev_info *base_info = ctx;
3185 
3186 	switch (status) {
3187 	case 0:
3188 		/* valid superblock found */
3189 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3190 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3191 
3192 			raid_bdev_free_base_bdev_resource(base_info);
3193 			raid_bdev_examine_sb(sb, bdev, base_info->configure_cb, base_info->configure_cb_ctx);
3194 			return;
3195 		}
3196 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3197 		status = -EEXIST;
3198 		raid_bdev_free_base_bdev_resource(base_info);
3199 		break;
3200 	case -EINVAL:
3201 		/* no valid superblock */
3202 		raid_bdev_configure_base_bdev_cont(base_info);
3203 		return;
3204 	default:
3205 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3206 			    base_info->name, spdk_strerror(-status));
3207 		break;
3208 	}
3209 
3210 	if (base_info->configure_cb != NULL) {
3211 		base_info->configure_cb(base_info->configure_cb_ctx, status);
3212 	}
3213 }
3214 
3215 static int
3216 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3217 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3218 {
3219 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3220 	struct spdk_bdev_desc *desc;
3221 	struct spdk_bdev *bdev;
3222 	const struct spdk_uuid *bdev_uuid;
3223 	int rc;
3224 
3225 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3226 	assert(base_info->desc == NULL);
3227 
3228 	/*
3229 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3230 	 * before claiming the bdev.
3231 	 */
3232 
3233 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3234 		char uuid_str[SPDK_UUID_STRING_LEN];
3235 		const char *bdev_name;
3236 
3237 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3238 
3239 		/* UUID of a bdev is registered as its alias */
3240 		bdev = spdk_bdev_get_by_name(uuid_str);
3241 		if (bdev == NULL) {
3242 			return -ENODEV;
3243 		}
3244 
3245 		bdev_name = spdk_bdev_get_name(bdev);
3246 
3247 		if (base_info->name == NULL) {
3248 			assert(existing == true);
3249 			base_info->name = strdup(bdev_name);
3250 			if (base_info->name == NULL) {
3251 				return -ENOMEM;
3252 			}
3253 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3254 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3255 				    bdev_name, base_info->name);
3256 			return -EINVAL;
3257 		}
3258 	}
3259 
3260 	assert(base_info->name != NULL);
3261 
3262 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3263 	if (rc != 0) {
3264 		if (rc != -ENODEV) {
3265 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3266 		}
3267 		return rc;
3268 	}
3269 
3270 	bdev = spdk_bdev_desc_get_bdev(desc);
3271 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3272 
3273 	if (spdk_uuid_is_null(&base_info->uuid)) {
3274 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3275 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3276 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3277 		spdk_bdev_close(desc);
3278 		return -EINVAL;
3279 	}
3280 
3281 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3282 	if (rc != 0) {
3283 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3284 		spdk_bdev_close(desc);
3285 		return rc;
3286 	}
3287 
3288 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3289 
3290 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3291 	if (base_info->app_thread_ch == NULL) {
3292 		SPDK_ERRLOG("Failed to get io channel\n");
3293 		spdk_bdev_module_release_bdev(bdev);
3294 		spdk_bdev_close(desc);
3295 		return -ENOMEM;
3296 	}
3297 
3298 	base_info->desc = desc;
3299 	base_info->blockcnt = bdev->blockcnt;
3300 
3301 	if (raid_bdev->superblock_enabled) {
3302 		uint64_t data_offset;
3303 
3304 		if (base_info->data_offset == 0) {
3305 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3306 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3307 		} else {
3308 			data_offset = base_info->data_offset;
3309 		}
3310 
3311 		if (bdev->optimal_io_boundary != 0) {
3312 			data_offset = spdk_divide_round_up(data_offset,
3313 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3314 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3315 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3316 					     base_info->data_offset, base_info->name, data_offset);
3317 				data_offset = base_info->data_offset;
3318 			}
3319 		}
3320 
3321 		base_info->data_offset = data_offset;
3322 	}
3323 
3324 	if (base_info->data_offset >= bdev->blockcnt) {
3325 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3326 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3327 		rc = -EINVAL;
3328 		goto out;
3329 	}
3330 
3331 	if (base_info->data_size == 0) {
3332 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3333 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3334 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3335 			    bdev->blockcnt, base_info->name);
3336 		rc = -EINVAL;
3337 		goto out;
3338 	}
3339 
3340 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3341 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3342 			    bdev->name);
3343 		rc = -EINVAL;
3344 		goto out;
3345 	}
3346 
3347 	/*
3348 	 * Set the raid bdev properties if this is the first base bdev configured,
3349 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3350 	 * have the same blocklen and metadata format.
3351 	 */
3352 	if (raid_bdev->bdev.blocklen == 0) {
3353 		raid_bdev->bdev.blocklen = bdev->blocklen;
3354 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3355 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3356 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3357 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3358 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3359 		raid_bdev->bdev.dif_pi_format = bdev->dif_pi_format;
3360 	} else {
3361 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3362 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3363 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3364 			rc = -EINVAL;
3365 			goto out;
3366 		}
3367 
3368 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3369 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3370 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3371 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3372 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev) ||
3373 		    raid_bdev->bdev.dif_pi_format != bdev->dif_pi_format) {
3374 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3375 				    raid_bdev->bdev.name, bdev->name);
3376 			rc = -EINVAL;
3377 			goto out;
3378 		}
3379 	}
3380 
3381 	base_info->configure_cb = cb_fn;
3382 	base_info->configure_cb_ctx = cb_ctx;
3383 
3384 	if (existing) {
3385 		raid_bdev_configure_base_bdev_cont(base_info);
3386 	} else {
3387 		/* check for existing superblock when using a new bdev */
3388 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3389 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3390 		if (rc) {
3391 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3392 				    bdev->name, spdk_strerror(-rc));
3393 		}
3394 	}
3395 out:
3396 	if (rc != 0) {
3397 		raid_bdev_free_base_bdev_resource(base_info);
3398 	}
3399 	return rc;
3400 }
3401 
3402 int
3403 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3404 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3405 {
3406 	struct raid_base_bdev_info *base_info = NULL, *iter;
3407 	int rc;
3408 
3409 	assert(name != NULL);
3410 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3411 
3412 	if (raid_bdev->process != NULL) {
3413 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3414 			    raid_bdev->bdev.name);
3415 		return -EPERM;
3416 	}
3417 
3418 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3419 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3420 
3421 		if (bdev != NULL) {
3422 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3423 				if (iter->name == NULL &&
3424 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3425 					base_info = iter;
3426 					break;
3427 				}
3428 			}
3429 		}
3430 	}
3431 
3432 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3433 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3434 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3435 				base_info = iter;
3436 				break;
3437 			}
3438 		}
3439 	}
3440 
3441 	if (base_info == NULL) {
3442 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3443 			    raid_bdev->bdev.name, name);
3444 		return -EINVAL;
3445 	}
3446 
3447 	assert(base_info->is_configured == false);
3448 
3449 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3450 		assert(base_info->data_size != 0);
3451 		assert(base_info->desc == NULL);
3452 	}
3453 
3454 	base_info->name = strdup(name);
3455 	if (base_info->name == NULL) {
3456 		return -ENOMEM;
3457 	}
3458 
3459 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3460 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3461 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3462 		free(base_info->name);
3463 		base_info->name = NULL;
3464 	}
3465 
3466 	return rc;
3467 }
3468 
3469 static int
3470 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3471 {
3472 	struct raid_bdev *raid_bdev;
3473 	uint8_t i;
3474 	int rc;
3475 
3476 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3477 			       sb->level, true, &sb->uuid, &raid_bdev);
3478 	if (rc != 0) {
3479 		return rc;
3480 	}
3481 
3482 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3483 	if (rc != 0) {
3484 		raid_bdev_free(raid_bdev);
3485 		return rc;
3486 	}
3487 
3488 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3489 	memcpy(raid_bdev->sb, sb, sb->length);
3490 
3491 	for (i = 0; i < sb->base_bdevs_size; i++) {
3492 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3493 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3494 
3495 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3496 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3497 			raid_bdev->num_base_bdevs_operational++;
3498 		}
3499 
3500 		base_info->data_offset = sb_base_bdev->data_offset;
3501 		base_info->data_size = sb_base_bdev->data_size;
3502 	}
3503 
3504 	*raid_bdev_out = raid_bdev;
3505 	return 0;
3506 }
3507 
3508 static void
3509 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3510 {
3511 	struct raid_bdev *raid_bdev;
3512 	struct raid_base_bdev_info *base_info;
3513 
3514 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3515 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3516 			continue;
3517 		}
3518 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3519 			if (base_info->desc == NULL &&
3520 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3521 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3522 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3523 				break;
3524 			}
3525 		}
3526 	}
3527 }
3528 
3529 struct raid_bdev_examine_others_ctx {
3530 	struct spdk_uuid raid_bdev_uuid;
3531 	uint8_t current_base_bdev_idx;
3532 	raid_base_bdev_cb cb_fn;
3533 	void *cb_ctx;
3534 };
3535 
3536 static void
3537 raid_bdev_examine_others_done(void *_ctx, int status)
3538 {
3539 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3540 
3541 	if (ctx->cb_fn != NULL) {
3542 		ctx->cb_fn(ctx->cb_ctx, status);
3543 	}
3544 	free(ctx);
3545 }
3546 
3547 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3548 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3549 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3550 				     void *cb_ctx);
3551 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3552 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3553 static void raid_bdev_examine_others(void *_ctx, int status);
3554 
3555 static void
3556 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3557 				 int status, void *_ctx)
3558 {
3559 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3560 
3561 	if (status != 0) {
3562 		raid_bdev_examine_others_done(ctx, status);
3563 		return;
3564 	}
3565 
3566 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3567 }
3568 
3569 static void
3570 raid_bdev_examine_others(void *_ctx, int status)
3571 {
3572 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3573 	struct raid_bdev *raid_bdev;
3574 	struct raid_base_bdev_info *base_info;
3575 	char uuid_str[SPDK_UUID_STRING_LEN];
3576 
3577 	if (status != 0) {
3578 		goto out;
3579 	}
3580 
3581 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3582 	if (raid_bdev == NULL) {
3583 		status = -ENODEV;
3584 		goto out;
3585 	}
3586 
3587 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3588 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3589 	     base_info++) {
3590 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3591 			continue;
3592 		}
3593 
3594 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3595 
3596 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3597 			continue;
3598 		}
3599 
3600 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3601 
3602 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3603 		if (status != 0) {
3604 			continue;
3605 		}
3606 		return;
3607 	}
3608 out:
3609 	raid_bdev_examine_others_done(ctx, status);
3610 }
3611 
3612 static void
3613 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3614 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3615 {
3616 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3617 	struct raid_bdev *raid_bdev;
3618 	struct raid_base_bdev_info *iter, *base_info;
3619 	uint8_t i;
3620 	int rc;
3621 
3622 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3623 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3624 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3625 		rc = -EINVAL;
3626 		goto out;
3627 	}
3628 
3629 	if (spdk_uuid_is_null(&sb->uuid)) {
3630 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3631 		rc = -EINVAL;
3632 		goto out;
3633 	}
3634 
3635 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3636 
3637 	if (raid_bdev) {
3638 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3639 			SPDK_DEBUGLOG(bdev_raid,
3640 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3641 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3642 
3643 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3644 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3645 					     raid_bdev->bdev.name, bdev->name);
3646 				rc = -EBUSY;
3647 				goto out;
3648 			}
3649 
3650 			/* remove and then recreate the raid bdev using the newer superblock */
3651 			raid_bdev_delete(raid_bdev, NULL, NULL);
3652 			raid_bdev = NULL;
3653 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3654 			SPDK_DEBUGLOG(bdev_raid,
3655 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3656 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3657 			/* use the current raid bdev superblock */
3658 			sb = raid_bdev->sb;
3659 		}
3660 	}
3661 
3662 	for (i = 0; i < sb->base_bdevs_size; i++) {
3663 		sb_base_bdev = &sb->base_bdevs[i];
3664 
3665 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3666 
3667 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3668 			break;
3669 		}
3670 	}
3671 
3672 	if (i == sb->base_bdevs_size) {
3673 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3674 		rc = -EINVAL;
3675 		goto out;
3676 	}
3677 
3678 	if (!raid_bdev) {
3679 		struct raid_bdev_examine_others_ctx *ctx;
3680 
3681 		ctx = calloc(1, sizeof(*ctx));
3682 		if (ctx == NULL) {
3683 			rc = -ENOMEM;
3684 			goto out;
3685 		}
3686 
3687 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3688 		if (rc != 0) {
3689 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3690 				    sb->name, spdk_strerror(-rc));
3691 			free(ctx);
3692 			goto out;
3693 		}
3694 
3695 		/* after this base bdev is configured, examine other base bdevs that may be present */
3696 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3697 		ctx->cb_fn = cb_fn;
3698 		ctx->cb_ctx = cb_ctx;
3699 
3700 		cb_fn = raid_bdev_examine_others;
3701 		cb_ctx = ctx;
3702 	}
3703 
3704 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3705 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3706 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3707 		assert(base_info->is_configured == false);
3708 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3709 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3710 		assert(spdk_uuid_is_null(&base_info->uuid));
3711 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3712 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3713 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3714 		if (rc != 0) {
3715 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3716 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3717 		}
3718 		goto out;
3719 	}
3720 
3721 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3722 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3723 			       bdev->name, raid_bdev->bdev.name);
3724 		rc = -EINVAL;
3725 		goto out;
3726 	}
3727 
3728 	base_info = NULL;
3729 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3730 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3731 			base_info = iter;
3732 			break;
3733 		}
3734 	}
3735 
3736 	if (base_info == NULL) {
3737 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3738 			    bdev->name, raid_bdev->bdev.name);
3739 		rc = -EINVAL;
3740 		goto out;
3741 	}
3742 
3743 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3744 	if (rc != 0) {
3745 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3746 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3747 	}
3748 out:
3749 	if (rc != 0 && cb_fn != 0) {
3750 		cb_fn(cb_ctx, rc);
3751 	}
3752 }
3753 
3754 struct raid_bdev_examine_ctx {
3755 	struct spdk_bdev_desc *desc;
3756 	struct spdk_io_channel *ch;
3757 	raid_bdev_examine_load_sb_cb cb;
3758 	void *cb_ctx;
3759 };
3760 
3761 static void
3762 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3763 {
3764 	if (!ctx) {
3765 		return;
3766 	}
3767 
3768 	if (ctx->ch) {
3769 		spdk_put_io_channel(ctx->ch);
3770 	}
3771 
3772 	if (ctx->desc) {
3773 		spdk_bdev_close(ctx->desc);
3774 	}
3775 
3776 	free(ctx);
3777 }
3778 
3779 static void
3780 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3781 {
3782 	struct raid_bdev_examine_ctx *ctx = _ctx;
3783 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3784 
3785 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3786 
3787 	raid_bdev_examine_ctx_free(ctx);
3788 }
3789 
3790 static void
3791 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3792 {
3793 }
3794 
3795 static int
3796 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3797 {
3798 	struct raid_bdev_examine_ctx *ctx;
3799 	int rc;
3800 
3801 	assert(cb != NULL);
3802 
3803 	ctx = calloc(1, sizeof(*ctx));
3804 	if (!ctx) {
3805 		return -ENOMEM;
3806 	}
3807 
3808 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3809 	if (rc) {
3810 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3811 		goto err;
3812 	}
3813 
3814 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3815 	if (!ctx->ch) {
3816 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3817 		rc = -ENOMEM;
3818 		goto err;
3819 	}
3820 
3821 	ctx->cb = cb;
3822 	ctx->cb_ctx = cb_ctx;
3823 
3824 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3825 	if (rc) {
3826 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3827 			    bdev_name, spdk_strerror(-rc));
3828 		goto err;
3829 	}
3830 
3831 	return 0;
3832 err:
3833 	raid_bdev_examine_ctx_free(ctx);
3834 	return rc;
3835 }
3836 
3837 static void
3838 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3839 		       void *ctx)
3840 {
3841 	switch (status) {
3842 	case 0:
3843 		/* valid superblock found */
3844 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3845 		raid_bdev_examine_sb(sb, bdev, NULL, NULL);
3846 		break;
3847 	case -EINVAL:
3848 		/* no valid superblock, check if it can be claimed anyway */
3849 		raid_bdev_examine_no_sb(bdev);
3850 		break;
3851 	default:
3852 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3853 			    bdev->name, spdk_strerror(-status));
3854 		break;
3855 	}
3856 
3857 	spdk_bdev_module_examine_done(&g_raid_if);
3858 }
3859 
3860 /*
3861  * brief:
3862  * raid_bdev_examine function is the examine function call by the below layers
3863  * like bdev_nvme layer. This function will check if this base bdev can be
3864  * claimed by this raid bdev or not.
3865  * params:
3866  * bdev - pointer to base bdev
3867  * returns:
3868  * none
3869  */
3870 static void
3871 raid_bdev_examine(struct spdk_bdev *bdev)
3872 {
3873 	int rc;
3874 
3875 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3876 		goto done;
3877 	}
3878 
3879 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3880 		raid_bdev_examine_no_sb(bdev);
3881 		goto done;
3882 	}
3883 
3884 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3885 	if (rc != 0) {
3886 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3887 			    bdev->name, spdk_strerror(-rc));
3888 		goto done;
3889 	}
3890 
3891 	return;
3892 done:
3893 	spdk_bdev_module_examine_done(&g_raid_if);
3894 }
3895 
3896 /* Log component for bdev raid bdev module */
3897 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3898