xref: /spdk/module/bdev/raid/bdev_raid.c (revision fc3c9b37ed4358fdcdb9e841103c151000d3f32c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
244 		/*
245 		 * Get the spdk_io_channel for all the base bdevs. This is used during
246 		 * split logic to send the respective child bdev ios to respective base
247 		 * bdev io channel.
248 		 * Skip missing base bdevs and the process target, which should also be treated as
249 		 * missing until the process completes.
250 		 */
251 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
252 		    raid_bdev->base_bdev_info[i].is_process_target == true) {
253 			continue;
254 		}
255 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
256 						   raid_bdev->base_bdev_info[i].desc);
257 		if (!raid_ch->base_channel[i]) {
258 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
259 			goto err;
260 		}
261 	}
262 
263 	if (raid_bdev->module->get_io_channel) {
264 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
265 		if (!raid_ch->module_channel) {
266 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
267 			goto err;
268 		}
269 	}
270 
271 	if (raid_bdev->process != NULL) {
272 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
273 		if (ret != 0) {
274 			SPDK_ERRLOG("Failed to setup process io channel\n");
275 			goto err;
276 		}
277 	} else {
278 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
279 	}
280 
281 	return 0;
282 err:
283 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
284 		if (raid_ch->base_channel[i] != NULL) {
285 			spdk_put_io_channel(raid_ch->base_channel[i]);
286 		}
287 	}
288 	free(raid_ch->base_channel);
289 
290 	raid_bdev_ch_process_cleanup(raid_ch);
291 
292 	return ret;
293 }
294 
295 /*
296  * brief:
297  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
298  * hierarchy from raid bdev to base bdev io channels. It will be called per core
299  * params:
300  * io_device - pointer to raid bdev io device represented by raid_bdev
301  * ctx_buf - pointer to context buffer for raid bdev io channel
302  * returns:
303  * none
304  */
305 static void
306 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
307 {
308 	struct raid_bdev *raid_bdev = io_device;
309 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
310 	uint8_t i;
311 
312 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
313 
314 	assert(raid_ch != NULL);
315 	assert(raid_ch->base_channel);
316 
317 	if (raid_ch->module_channel) {
318 		spdk_put_io_channel(raid_ch->module_channel);
319 	}
320 
321 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
322 		/* Free base bdev channels */
323 		if (raid_ch->base_channel[i] != NULL) {
324 			spdk_put_io_channel(raid_ch->base_channel[i]);
325 		}
326 	}
327 	free(raid_ch->base_channel);
328 	raid_ch->base_channel = NULL;
329 
330 	raid_bdev_ch_process_cleanup(raid_ch);
331 }
332 
333 /*
334  * brief:
335  * raid_bdev_cleanup is used to cleanup raid_bdev related data
336  * structures.
337  * params:
338  * raid_bdev - pointer to raid_bdev
339  * returns:
340  * none
341  */
342 static void
343 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
344 {
345 	struct raid_base_bdev_info *base_info;
346 
347 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
348 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
349 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
350 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
351 
352 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
353 		assert(base_info->desc == NULL);
354 		free(base_info->name);
355 	}
356 
357 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
358 }
359 
360 static void
361 raid_bdev_free(struct raid_bdev *raid_bdev)
362 {
363 	raid_bdev_free_superblock(raid_bdev);
364 	free(raid_bdev->base_bdev_info);
365 	free(raid_bdev->bdev.name);
366 	free(raid_bdev);
367 }
368 
369 static void
370 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
371 {
372 	raid_bdev_cleanup(raid_bdev);
373 	raid_bdev_free(raid_bdev);
374 }
375 
376 static void
377 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
378 {
379 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
380 
381 	assert(base_info->is_configured);
382 	assert(raid_bdev->num_base_bdevs_discovered);
383 	raid_bdev->num_base_bdevs_discovered--;
384 	base_info->is_configured = false;
385 	base_info->is_process_target = false;
386 }
387 
388 /*
389  * brief:
390  * free resource of base bdev for raid bdev
391  * params:
392  * base_info - raid base bdev info
393  * returns:
394  * none
395  */
396 static void
397 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
398 {
399 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
400 
401 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
402 
403 	free(base_info->name);
404 	base_info->name = NULL;
405 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
406 		spdk_uuid_set_null(&base_info->uuid);
407 	}
408 
409 	if (base_info->desc == NULL) {
410 		return;
411 	}
412 
413 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
414 	spdk_bdev_close(base_info->desc);
415 	base_info->desc = NULL;
416 	spdk_put_io_channel(base_info->app_thread_ch);
417 	base_info->app_thread_ch = NULL;
418 
419 	if (base_info->is_configured) {
420 		raid_bdev_deconfigure_base_bdev(base_info);
421 	}
422 }
423 
424 static void
425 raid_bdev_io_device_unregister_cb(void *io_device)
426 {
427 	struct raid_bdev *raid_bdev = io_device;
428 
429 	if (raid_bdev->num_base_bdevs_discovered == 0) {
430 		/* Free raid_bdev when there are no base bdevs left */
431 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
432 		raid_bdev_cleanup(raid_bdev);
433 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
434 		raid_bdev_free(raid_bdev);
435 	} else {
436 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
437 	}
438 }
439 
440 void
441 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
442 {
443 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
444 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
445 	}
446 }
447 
448 static void
449 _raid_bdev_destruct(void *ctxt)
450 {
451 	struct raid_bdev *raid_bdev = ctxt;
452 	struct raid_base_bdev_info *base_info;
453 
454 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
455 
456 	assert(raid_bdev->process == NULL);
457 
458 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
459 		/*
460 		 * Close all base bdev descriptors for which call has come from below
461 		 * layers.  Also close the descriptors if we have started shutdown.
462 		 */
463 		if (g_shutdown_started || base_info->remove_scheduled == true) {
464 			raid_bdev_free_base_bdev_resource(base_info);
465 		}
466 	}
467 
468 	if (g_shutdown_started) {
469 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
470 	}
471 
472 	if (raid_bdev->module->stop != NULL) {
473 		if (raid_bdev->module->stop(raid_bdev) == false) {
474 			return;
475 		}
476 	}
477 
478 	raid_bdev_module_stop_done(raid_bdev);
479 }
480 
481 static int
482 raid_bdev_destruct(void *ctx)
483 {
484 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
485 
486 	return 1;
487 }
488 
489 static int
490 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
491 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
492 {
493 	struct spdk_dif_ctx dif_ctx;
494 	struct spdk_dif_error err_blk = {};
495 	int rc;
496 	struct spdk_dif_ctx_init_ext_opts dif_opts;
497 	struct iovec md_iov = {
498 		.iov_base	= md_buf,
499 		.iov_len	= num_blocks * bdev->md_len,
500 	};
501 
502 	if (md_buf == NULL) {
503 		return 0;
504 	}
505 
506 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
507 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
508 	rc = spdk_dif_ctx_init(&dif_ctx,
509 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
510 			       bdev->dif_is_head_of_md, bdev->dif_type,
511 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
512 			       0, 0, 0, 0, 0, &dif_opts);
513 	if (rc != 0) {
514 		SPDK_ERRLOG("Initialization of DIF context failed\n");
515 		return rc;
516 	}
517 
518 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
519 
520 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
521 	if (rc != 0) {
522 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
523 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
524 	}
525 
526 	return rc;
527 }
528 
529 int
530 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
531 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
532 {
533 	struct spdk_dif_ctx dif_ctx;
534 	struct spdk_dif_error err_blk = {};
535 	int rc;
536 	struct spdk_dif_ctx_init_ext_opts dif_opts;
537 	struct iovec md_iov = {
538 		.iov_base	= md_buf,
539 		.iov_len	= num_blocks * bdev->md_len,
540 	};
541 
542 	if (md_buf == NULL) {
543 		return 0;
544 	}
545 
546 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
547 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
548 	rc = spdk_dif_ctx_init(&dif_ctx,
549 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
550 			       bdev->dif_is_head_of_md, bdev->dif_type,
551 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
552 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
553 	if (rc != 0) {
554 		SPDK_ERRLOG("Initialization of DIF context failed\n");
555 		return rc;
556 	}
557 
558 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
559 	if (rc != 0) {
560 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
561 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
562 	}
563 
564 	return rc;
565 }
566 
567 /**
568  * Raid bdev I/O read/write wrapper for spdk_bdev_readv_blocks_ext function.
569  */
570 int
571 raid_bdev_readv_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
572 			   struct iovec *iov, int iovcnt, uint64_t offset_blocks,
573 			   uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
574 			   struct spdk_bdev_ext_io_opts *opts)
575 {
576 	return spdk_bdev_readv_blocks_ext(base_info->desc, ch, iov, iovcnt,
577 					  base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
578 }
579 
580 /**
581  * Raid bdev I/O read/write wrapper for spdk_bdev_writev_blocks_ext function.
582  */
583 int
584 raid_bdev_writev_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
585 			    struct iovec *iov, int iovcnt, uint64_t offset_blocks,
586 			    uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
587 			    struct spdk_bdev_ext_io_opts *opts)
588 {
589 	int rc;
590 	uint64_t remapped_offset_blocks = base_info->data_offset + offset_blocks;
591 
592 	if (spdk_unlikely(spdk_bdev_get_dif_type(&base_info->raid_bdev->bdev) != SPDK_DIF_DISABLE &&
593 			  base_info->raid_bdev->bdev.dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK)) {
594 
595 		rc = raid_bdev_remap_dix_reftag(opts->metadata, num_blocks, &base_info->raid_bdev->bdev,
596 						remapped_offset_blocks);
597 		if (rc != 0) {
598 			return rc;
599 		}
600 	}
601 
602 	return spdk_bdev_writev_blocks_ext(base_info->desc, ch, iov, iovcnt,
603 					   remapped_offset_blocks, num_blocks, cb, cb_arg, opts);
604 }
605 
606 void
607 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
608 {
609 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
610 	int rc;
611 
612 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
613 		struct iovec *split_iov = raid_io->split.iov;
614 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
615 
616 		/*
617 		 * Non-zero offset here means that this is the completion of the first part of the
618 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
619 		 */
620 		if (raid_io->split.offset != 0) {
621 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
622 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
623 
624 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
625 				raid_io->num_blocks = raid_io->split.offset;
626 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
627 				raid_io->iovs = bdev_io->u.bdev.iovs;
628 				if (split_iov != NULL) {
629 					raid_io->iovcnt++;
630 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
631 					split_iov->iov_base = split_iov_orig->iov_base;
632 				}
633 
634 				raid_io->split.offset = 0;
635 				raid_io->base_bdev_io_submitted = 0;
636 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
637 
638 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
639 				return;
640 			}
641 		}
642 
643 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
644 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
645 		raid_io->iovs = bdev_io->u.bdev.iovs;
646 		if (split_iov != NULL) {
647 			*split_iov = *split_iov_orig;
648 		}
649 	}
650 
651 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
652 		raid_io->completion_cb(raid_io, status);
653 	} else {
654 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
655 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
656 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
657 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
658 
659 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
660 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
661 							bdev_io->u.bdev.offset_blocks);
662 			if (rc != 0) {
663 				status = SPDK_BDEV_IO_STATUS_FAILED;
664 			}
665 		}
666 		spdk_bdev_io_complete(bdev_io, status);
667 	}
668 }
669 
670 /*
671  * brief:
672  * raid_bdev_io_complete_part - signal the completion of a part of the expected
673  * base bdev IOs and complete the raid_io if this is the final expected IO.
674  * The caller should first set raid_io->base_bdev_io_remaining. This function
675  * will decrement this counter by the value of the 'completed' parameter and
676  * complete the raid_io if the counter reaches 0. The caller is free to
677  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
678  * it can represent e.g. blocks or IOs.
679  * params:
680  * raid_io - pointer to raid_bdev_io
681  * completed - the part of the raid_io that has been completed
682  * status - status of the base IO
683  * returns:
684  * true - if the raid_io is completed
685  * false - otherwise
686  */
687 bool
688 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
689 			   enum spdk_bdev_io_status status)
690 {
691 	assert(raid_io->base_bdev_io_remaining >= completed);
692 	raid_io->base_bdev_io_remaining -= completed;
693 
694 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
695 		raid_io->base_bdev_io_status = status;
696 	}
697 
698 	if (raid_io->base_bdev_io_remaining == 0) {
699 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
700 		return true;
701 	} else {
702 		return false;
703 	}
704 }
705 
706 /*
707  * brief:
708  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
709  * It will try to queue the IOs after storing the context to bdev wait queue logic.
710  * params:
711  * raid_io - pointer to raid_bdev_io
712  * bdev - the block device that the IO is submitted to
713  * ch - io channel
714  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
715  * returns:
716  * none
717  */
718 void
719 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
720 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
721 {
722 	raid_io->waitq_entry.bdev = bdev;
723 	raid_io->waitq_entry.cb_fn = cb_fn;
724 	raid_io->waitq_entry.cb_arg = raid_io;
725 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
726 }
727 
728 static void
729 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
730 {
731 	struct raid_bdev_io *raid_io = cb_arg;
732 
733 	spdk_bdev_free_io(bdev_io);
734 
735 	raid_bdev_io_complete_part(raid_io, 1, success ?
736 				   SPDK_BDEV_IO_STATUS_SUCCESS :
737 				   SPDK_BDEV_IO_STATUS_FAILED);
738 }
739 
740 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
741 
742 static void
743 _raid_bdev_submit_reset_request(void *_raid_io)
744 {
745 	struct raid_bdev_io *raid_io = _raid_io;
746 
747 	raid_bdev_submit_reset_request(raid_io);
748 }
749 
750 /*
751  * brief:
752  * raid_bdev_submit_reset_request function submits reset requests
753  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
754  * which case it will queue it for later submission
755  * params:
756  * raid_io
757  * returns:
758  * none
759  */
760 static void
761 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
762 {
763 	struct raid_bdev		*raid_bdev;
764 	int				ret;
765 	uint8_t				i;
766 	struct raid_base_bdev_info	*base_info;
767 	struct spdk_io_channel		*base_ch;
768 
769 	raid_bdev = raid_io->raid_bdev;
770 
771 	if (raid_io->base_bdev_io_remaining == 0) {
772 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
773 	}
774 
775 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
776 		base_info = &raid_bdev->base_bdev_info[i];
777 		base_ch = raid_io->raid_ch->base_channel[i];
778 		if (base_ch == NULL) {
779 			raid_io->base_bdev_io_submitted++;
780 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
781 			continue;
782 		}
783 		ret = spdk_bdev_reset(base_info->desc, base_ch,
784 				      raid_base_bdev_reset_complete, raid_io);
785 		if (ret == 0) {
786 			raid_io->base_bdev_io_submitted++;
787 		} else if (ret == -ENOMEM) {
788 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
789 						base_ch, _raid_bdev_submit_reset_request);
790 			return;
791 		} else {
792 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
793 			assert(false);
794 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
795 			return;
796 		}
797 	}
798 }
799 
800 static void
801 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
802 {
803 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
804 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
805 	int i;
806 
807 	assert(split_offset != 0);
808 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
809 	raid_io->split.offset = split_offset;
810 
811 	raid_io->offset_blocks += split_offset;
812 	raid_io->num_blocks -= split_offset;
813 	if (raid_io->md_buf != NULL) {
814 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
815 	}
816 
817 	for (i = 0; i < raid_io->iovcnt; i++) {
818 		struct iovec *iov = &raid_io->iovs[i];
819 
820 		if (iov_offset < iov->iov_len) {
821 			if (iov_offset == 0) {
822 				raid_io->split.iov = NULL;
823 			} else {
824 				raid_io->split.iov = iov;
825 				raid_io->split.iov_copy = *iov;
826 				iov->iov_base += iov_offset;
827 				iov->iov_len -= iov_offset;
828 			}
829 			raid_io->iovs += i;
830 			raid_io->iovcnt -= i;
831 			break;
832 		}
833 
834 		iov_offset -= iov->iov_len;
835 	}
836 }
837 
838 static void
839 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
840 {
841 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
842 
843 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
844 		uint64_t offset_begin = raid_io->offset_blocks;
845 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
846 
847 		if (offset_end > raid_ch->process.offset) {
848 			if (offset_begin < raid_ch->process.offset) {
849 				/*
850 				 * If the I/O spans both the processed and unprocessed ranges,
851 				 * split it and first handle the unprocessed part. After it
852 				 * completes, the rest will be handled.
853 				 * This situation occurs when the process thread is not active
854 				 * or is waiting for the process window range to be locked
855 				 * (quiesced). When a window is being processed, such I/Os will be
856 				 * deferred by the bdev layer until the window is unlocked.
857 				 */
858 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
859 					      raid_ch->process.offset, offset_begin, offset_end);
860 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
861 			}
862 		} else {
863 			/* Use the child channel, which corresponds to the already processed range */
864 			raid_io->raid_ch = raid_ch->process.ch_processed;
865 		}
866 	}
867 
868 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
869 }
870 
871 /*
872  * brief:
873  * Callback function to spdk_bdev_io_get_buf.
874  * params:
875  * ch - pointer to raid bdev io channel
876  * bdev_io - pointer to parent bdev_io on raid bdev device
877  * success - True if buffer is allocated or false otherwise.
878  * returns:
879  * none
880  */
881 static void
882 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
883 		     bool success)
884 {
885 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
886 
887 	if (!success) {
888 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
889 		return;
890 	}
891 
892 	raid_bdev_submit_rw_request(raid_io);
893 }
894 
895 void
896 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
897 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
898 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
899 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
900 {
901 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
902 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
903 
904 	raid_io->type = type;
905 	raid_io->offset_blocks = offset_blocks;
906 	raid_io->num_blocks = num_blocks;
907 	raid_io->iovs = iovs;
908 	raid_io->iovcnt = iovcnt;
909 	raid_io->memory_domain = memory_domain;
910 	raid_io->memory_domain_ctx = memory_domain_ctx;
911 	raid_io->md_buf = md_buf;
912 
913 	raid_io->raid_bdev = raid_bdev;
914 	raid_io->raid_ch = raid_ch;
915 	raid_io->base_bdev_io_remaining = 0;
916 	raid_io->base_bdev_io_submitted = 0;
917 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
918 	raid_io->completion_cb = NULL;
919 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
920 }
921 
922 /*
923  * brief:
924  * raid_bdev_submit_request function is the submit_request function pointer of
925  * raid bdev function table. This is used to submit the io on raid_bdev to below
926  * layers.
927  * params:
928  * ch - pointer to raid bdev io channel
929  * bdev_io - pointer to parent bdev_io on raid bdev device
930  * returns:
931  * none
932  */
933 static void
934 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
935 {
936 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
937 
938 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
939 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
940 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
941 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
942 
943 	switch (bdev_io->type) {
944 	case SPDK_BDEV_IO_TYPE_READ:
945 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
946 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
947 		break;
948 	case SPDK_BDEV_IO_TYPE_WRITE:
949 		raid_bdev_submit_rw_request(raid_io);
950 		break;
951 
952 	case SPDK_BDEV_IO_TYPE_RESET:
953 		raid_bdev_submit_reset_request(raid_io);
954 		break;
955 
956 	case SPDK_BDEV_IO_TYPE_FLUSH:
957 	case SPDK_BDEV_IO_TYPE_UNMAP:
958 		if (raid_io->raid_bdev->process != NULL) {
959 			/* TODO: rebuild support */
960 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
961 			return;
962 		}
963 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
964 		break;
965 
966 	default:
967 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
968 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
969 		break;
970 	}
971 }
972 
973 /*
974  * brief:
975  * _raid_bdev_io_type_supported checks whether io_type is supported in
976  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
977  * doesn't support, the raid device doesn't supports.
978  *
979  * params:
980  * raid_bdev - pointer to raid bdev context
981  * io_type - io type
982  * returns:
983  * true - io_type is supported
984  * false - io_type is not supported
985  */
986 inline static bool
987 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
988 {
989 	struct raid_base_bdev_info *base_info;
990 
991 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
992 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
993 		if (raid_bdev->module->submit_null_payload_request == NULL) {
994 			return false;
995 		}
996 	}
997 
998 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
999 		if (base_info->desc == NULL) {
1000 			continue;
1001 		}
1002 
1003 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
1004 			return false;
1005 		}
1006 	}
1007 
1008 	return true;
1009 }
1010 
1011 /*
1012  * brief:
1013  * raid_bdev_io_type_supported is the io_supported function for bdev function
1014  * table which returns whether the particular io type is supported or not by
1015  * raid bdev module
1016  * params:
1017  * ctx - pointer to raid bdev context
1018  * type - io type
1019  * returns:
1020  * true - io_type is supported
1021  * false - io_type is not supported
1022  */
1023 static bool
1024 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1025 {
1026 	switch (io_type) {
1027 	case SPDK_BDEV_IO_TYPE_READ:
1028 	case SPDK_BDEV_IO_TYPE_WRITE:
1029 		return true;
1030 
1031 	case SPDK_BDEV_IO_TYPE_FLUSH:
1032 	case SPDK_BDEV_IO_TYPE_RESET:
1033 	case SPDK_BDEV_IO_TYPE_UNMAP:
1034 		return _raid_bdev_io_type_supported(ctx, io_type);
1035 
1036 	default:
1037 		return false;
1038 	}
1039 
1040 	return false;
1041 }
1042 
1043 /*
1044  * brief:
1045  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1046  * raid bdev. This is used to return the io channel for this raid bdev
1047  * params:
1048  * ctxt - pointer to raid_bdev
1049  * returns:
1050  * pointer to io channel for raid bdev
1051  */
1052 static struct spdk_io_channel *
1053 raid_bdev_get_io_channel(void *ctxt)
1054 {
1055 	struct raid_bdev *raid_bdev = ctxt;
1056 
1057 	return spdk_get_io_channel(raid_bdev);
1058 }
1059 
1060 void
1061 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1062 {
1063 	struct raid_base_bdev_info *base_info;
1064 
1065 	assert(raid_bdev != NULL);
1066 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1067 
1068 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1069 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1070 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1071 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1072 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1073 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1074 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1075 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1076 				     raid_bdev->num_base_bdevs_operational);
1077 	if (raid_bdev->process) {
1078 		struct raid_bdev_process *process = raid_bdev->process;
1079 		uint64_t offset = process->window_offset;
1080 
1081 		spdk_json_write_named_object_begin(w, "process");
1082 		spdk_json_write_name(w, "type");
1083 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1084 		spdk_json_write_named_string(w, "target", process->target->name);
1085 		spdk_json_write_named_object_begin(w, "progress");
1086 		spdk_json_write_named_uint64(w, "blocks", offset);
1087 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1088 		spdk_json_write_object_end(w);
1089 		spdk_json_write_object_end(w);
1090 	}
1091 	spdk_json_write_name(w, "base_bdevs_list");
1092 	spdk_json_write_array_begin(w);
1093 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1094 		spdk_json_write_object_begin(w);
1095 		spdk_json_write_name(w, "name");
1096 		if (base_info->name) {
1097 			spdk_json_write_string(w, base_info->name);
1098 		} else {
1099 			spdk_json_write_null(w);
1100 		}
1101 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1102 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1103 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1104 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1105 		spdk_json_write_object_end(w);
1106 	}
1107 	spdk_json_write_array_end(w);
1108 }
1109 
1110 /*
1111  * brief:
1112  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1113  * params:
1114  * ctx - pointer to raid_bdev
1115  * w - pointer to json context
1116  * returns:
1117  * 0 - success
1118  * non zero - failure
1119  */
1120 static int
1121 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1122 {
1123 	struct raid_bdev *raid_bdev = ctx;
1124 
1125 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1126 
1127 	/* Dump the raid bdev configuration related information */
1128 	spdk_json_write_named_object_begin(w, "raid");
1129 	raid_bdev_write_info_json(raid_bdev, w);
1130 	spdk_json_write_object_end(w);
1131 
1132 	return 0;
1133 }
1134 
1135 /*
1136  * brief:
1137  * raid_bdev_write_config_json is the function table pointer for raid bdev
1138  * params:
1139  * bdev - pointer to spdk_bdev
1140  * w - pointer to json context
1141  * returns:
1142  * none
1143  */
1144 static void
1145 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1146 {
1147 	struct raid_bdev *raid_bdev = bdev->ctxt;
1148 	struct raid_base_bdev_info *base_info;
1149 
1150 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1151 
1152 	if (raid_bdev->superblock_enabled) {
1153 		/* raid bdev configuration is stored in the superblock */
1154 		return;
1155 	}
1156 
1157 	spdk_json_write_object_begin(w);
1158 
1159 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1160 
1161 	spdk_json_write_named_object_begin(w, "params");
1162 	spdk_json_write_named_string(w, "name", bdev->name);
1163 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1164 	if (raid_bdev->strip_size_kb != 0) {
1165 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1166 	}
1167 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1168 
1169 	spdk_json_write_named_array_begin(w, "base_bdevs");
1170 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1171 		if (base_info->name) {
1172 			spdk_json_write_string(w, base_info->name);
1173 		} else {
1174 			char str[32];
1175 
1176 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1177 			spdk_json_write_string(w, str);
1178 		}
1179 	}
1180 	spdk_json_write_array_end(w);
1181 	spdk_json_write_object_end(w);
1182 
1183 	spdk_json_write_object_end(w);
1184 }
1185 
1186 static int
1187 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1188 {
1189 	struct raid_bdev *raid_bdev = ctx;
1190 	struct raid_base_bdev_info *base_info;
1191 	int domains_count = 0, rc = 0;
1192 
1193 	if (raid_bdev->module->memory_domains_supported == false) {
1194 		return 0;
1195 	}
1196 
1197 	/* First loop to get the number of memory domains */
1198 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1199 		if (base_info->is_configured == false) {
1200 			continue;
1201 		}
1202 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1203 		if (rc < 0) {
1204 			return rc;
1205 		}
1206 		domains_count += rc;
1207 	}
1208 
1209 	if (!domains || array_size < domains_count) {
1210 		return domains_count;
1211 	}
1212 
1213 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1214 		if (base_info->is_configured == false) {
1215 			continue;
1216 		}
1217 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1218 		if (rc < 0) {
1219 			return rc;
1220 		}
1221 		domains += rc;
1222 		array_size -= rc;
1223 	}
1224 
1225 	return domains_count;
1226 }
1227 
1228 /* g_raid_bdev_fn_table is the function table for raid bdev */
1229 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1230 	.destruct		= raid_bdev_destruct,
1231 	.submit_request		= raid_bdev_submit_request,
1232 	.io_type_supported	= raid_bdev_io_type_supported,
1233 	.get_io_channel		= raid_bdev_get_io_channel,
1234 	.dump_info_json		= raid_bdev_dump_info_json,
1235 	.write_config_json	= raid_bdev_write_config_json,
1236 	.get_memory_domains	= raid_bdev_get_memory_domains,
1237 };
1238 
1239 struct raid_bdev *
1240 raid_bdev_find_by_name(const char *name)
1241 {
1242 	struct raid_bdev *raid_bdev;
1243 
1244 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1245 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1246 			return raid_bdev;
1247 		}
1248 	}
1249 
1250 	return NULL;
1251 }
1252 
1253 static struct raid_bdev *
1254 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1255 {
1256 	struct raid_bdev *raid_bdev;
1257 
1258 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1259 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1260 			return raid_bdev;
1261 		}
1262 	}
1263 
1264 	return NULL;
1265 }
1266 
1267 static struct {
1268 	const char *name;
1269 	enum raid_level value;
1270 } g_raid_level_names[] = {
1271 	{ "raid0", RAID0 },
1272 	{ "0", RAID0 },
1273 	{ "raid1", RAID1 },
1274 	{ "1", RAID1 },
1275 	{ "raid5f", RAID5F },
1276 	{ "5f", RAID5F },
1277 	{ "concat", CONCAT },
1278 	{ }
1279 };
1280 
1281 const char *g_raid_state_names[] = {
1282 	[RAID_BDEV_STATE_ONLINE]	= "online",
1283 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1284 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1285 	[RAID_BDEV_STATE_MAX]		= NULL
1286 };
1287 
1288 static const char *g_raid_process_type_names[] = {
1289 	[RAID_PROCESS_NONE]	= "none",
1290 	[RAID_PROCESS_REBUILD]	= "rebuild",
1291 	[RAID_PROCESS_MAX]	= NULL
1292 };
1293 
1294 /* We have to use the typedef in the function declaration to appease astyle. */
1295 typedef enum raid_level raid_level_t;
1296 typedef enum raid_bdev_state raid_bdev_state_t;
1297 
1298 raid_level_t
1299 raid_bdev_str_to_level(const char *str)
1300 {
1301 	unsigned int i;
1302 
1303 	assert(str != NULL);
1304 
1305 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1306 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1307 			return g_raid_level_names[i].value;
1308 		}
1309 	}
1310 
1311 	return INVALID_RAID_LEVEL;
1312 }
1313 
1314 const char *
1315 raid_bdev_level_to_str(enum raid_level level)
1316 {
1317 	unsigned int i;
1318 
1319 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1320 		if (g_raid_level_names[i].value == level) {
1321 			return g_raid_level_names[i].name;
1322 		}
1323 	}
1324 
1325 	return "";
1326 }
1327 
1328 raid_bdev_state_t
1329 raid_bdev_str_to_state(const char *str)
1330 {
1331 	unsigned int i;
1332 
1333 	assert(str != NULL);
1334 
1335 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1336 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1337 			break;
1338 		}
1339 	}
1340 
1341 	return i;
1342 }
1343 
1344 const char *
1345 raid_bdev_state_to_str(enum raid_bdev_state state)
1346 {
1347 	if (state >= RAID_BDEV_STATE_MAX) {
1348 		return "";
1349 	}
1350 
1351 	return g_raid_state_names[state];
1352 }
1353 
1354 const char *
1355 raid_bdev_process_to_str(enum raid_process_type value)
1356 {
1357 	if (value >= RAID_PROCESS_MAX) {
1358 		return "";
1359 	}
1360 
1361 	return g_raid_process_type_names[value];
1362 }
1363 
1364 /*
1365  * brief:
1366  * raid_bdev_fini_start is called when bdev layer is starting the
1367  * shutdown process
1368  * params:
1369  * none
1370  * returns:
1371  * none
1372  */
1373 static void
1374 raid_bdev_fini_start(void)
1375 {
1376 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1377 	g_shutdown_started = true;
1378 }
1379 
1380 /*
1381  * brief:
1382  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1383  * params:
1384  * none
1385  * returns:
1386  * none
1387  */
1388 static void
1389 raid_bdev_exit(void)
1390 {
1391 	struct raid_bdev *raid_bdev, *tmp;
1392 
1393 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1394 
1395 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1396 		raid_bdev_cleanup_and_free(raid_bdev);
1397 	}
1398 }
1399 
1400 static void
1401 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1402 {
1403 	spdk_json_write_object_begin(w);
1404 
1405 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1406 
1407 	spdk_json_write_named_object_begin(w, "params");
1408 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1409 	spdk_json_write_object_end(w);
1410 
1411 	spdk_json_write_object_end(w);
1412 }
1413 
1414 static int
1415 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1416 {
1417 	raid_bdev_opts_config_json(w);
1418 
1419 	return 0;
1420 }
1421 
1422 /*
1423  * brief:
1424  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1425  * module
1426  * params:
1427  * none
1428  * returns:
1429  * size of spdk_bdev_io context for raid
1430  */
1431 static int
1432 raid_bdev_get_ctx_size(void)
1433 {
1434 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1435 	return sizeof(struct raid_bdev_io);
1436 }
1437 
1438 static struct spdk_bdev_module g_raid_if = {
1439 	.name = "raid",
1440 	.module_init = raid_bdev_init,
1441 	.fini_start = raid_bdev_fini_start,
1442 	.module_fini = raid_bdev_exit,
1443 	.config_json = raid_bdev_config_json,
1444 	.get_ctx_size = raid_bdev_get_ctx_size,
1445 	.examine_disk = raid_bdev_examine,
1446 	.async_init = false,
1447 	.async_fini = false,
1448 };
1449 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1450 
1451 /*
1452  * brief:
1453  * raid_bdev_init is the initialization function for raid bdev module
1454  * params:
1455  * none
1456  * returns:
1457  * 0 - success
1458  * non zero - failure
1459  */
1460 static int
1461 raid_bdev_init(void)
1462 {
1463 	return 0;
1464 }
1465 
1466 static int
1467 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1468 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1469 		  struct raid_bdev **raid_bdev_out)
1470 {
1471 	struct raid_bdev *raid_bdev;
1472 	struct spdk_bdev *raid_bdev_gen;
1473 	struct raid_bdev_module *module;
1474 	struct raid_base_bdev_info *base_info;
1475 	uint8_t min_operational;
1476 
1477 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1478 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1479 		return -EINVAL;
1480 	}
1481 
1482 	if (raid_bdev_find_by_name(name) != NULL) {
1483 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1484 		return -EEXIST;
1485 	}
1486 
1487 	if (level == RAID1) {
1488 		if (strip_size != 0) {
1489 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1490 			return -EINVAL;
1491 		}
1492 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1493 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1494 		return -EINVAL;
1495 	}
1496 
1497 	module = raid_bdev_module_find(level);
1498 	if (module == NULL) {
1499 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1500 		return -EINVAL;
1501 	}
1502 
1503 	assert(module->base_bdevs_min != 0);
1504 	if (num_base_bdevs < module->base_bdevs_min) {
1505 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1506 			    module->base_bdevs_min,
1507 			    raid_bdev_level_to_str(level));
1508 		return -EINVAL;
1509 	}
1510 
1511 	switch (module->base_bdevs_constraint.type) {
1512 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1513 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1514 		break;
1515 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1516 		min_operational = module->base_bdevs_constraint.value;
1517 		break;
1518 	case CONSTRAINT_UNSET:
1519 		if (module->base_bdevs_constraint.value != 0) {
1520 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1521 				    (uint8_t)module->base_bdevs_constraint.value, name);
1522 			return -EINVAL;
1523 		}
1524 		min_operational = num_base_bdevs;
1525 		break;
1526 	default:
1527 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1528 			    (uint8_t)module->base_bdevs_constraint.type,
1529 			    raid_bdev_level_to_str(module->level));
1530 		return -EINVAL;
1531 	};
1532 
1533 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1534 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1535 			    raid_bdev_level_to_str(module->level));
1536 		return -EINVAL;
1537 	}
1538 
1539 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1540 	if (!raid_bdev) {
1541 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1542 		return -ENOMEM;
1543 	}
1544 
1545 	raid_bdev->module = module;
1546 	raid_bdev->num_base_bdevs = num_base_bdevs;
1547 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1548 					   sizeof(struct raid_base_bdev_info));
1549 	if (!raid_bdev->base_bdev_info) {
1550 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1551 		raid_bdev_free(raid_bdev);
1552 		return -ENOMEM;
1553 	}
1554 
1555 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1556 		base_info->raid_bdev = raid_bdev;
1557 	}
1558 
1559 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1560 	 * internally and set later.
1561 	 */
1562 	raid_bdev->strip_size = 0;
1563 	raid_bdev->strip_size_kb = strip_size;
1564 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1565 	raid_bdev->level = level;
1566 	raid_bdev->min_base_bdevs_operational = min_operational;
1567 	raid_bdev->superblock_enabled = superblock_enabled;
1568 
1569 	raid_bdev_gen = &raid_bdev->bdev;
1570 
1571 	raid_bdev_gen->name = strdup(name);
1572 	if (!raid_bdev_gen->name) {
1573 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1574 		raid_bdev_free(raid_bdev);
1575 		return -ENOMEM;
1576 	}
1577 
1578 	raid_bdev_gen->product_name = "Raid Volume";
1579 	raid_bdev_gen->ctxt = raid_bdev;
1580 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1581 	raid_bdev_gen->module = &g_raid_if;
1582 	raid_bdev_gen->write_cache = 0;
1583 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1584 
1585 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1586 
1587 	*raid_bdev_out = raid_bdev;
1588 
1589 	return 0;
1590 }
1591 
1592 /*
1593  * brief:
1594  * raid_bdev_create allocates raid bdev based on passed configuration
1595  * params:
1596  * name - name for raid bdev
1597  * strip_size - strip size in KB
1598  * num_base_bdevs - number of base bdevs
1599  * level - raid level
1600  * superblock_enabled - true if raid should have superblock
1601  * uuid - uuid to set for the bdev
1602  * raid_bdev_out - the created raid bdev
1603  * returns:
1604  * 0 - success
1605  * non zero - failure
1606  */
1607 int
1608 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1609 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1610 		 struct raid_bdev **raid_bdev_out)
1611 {
1612 	struct raid_bdev *raid_bdev;
1613 	int rc;
1614 
1615 	assert(uuid != NULL);
1616 
1617 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1618 			       &raid_bdev);
1619 	if (rc != 0) {
1620 		return rc;
1621 	}
1622 
1623 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1624 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1625 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1626 	}
1627 
1628 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1629 
1630 	*raid_bdev_out = raid_bdev;
1631 
1632 	return 0;
1633 }
1634 
1635 static void
1636 _raid_bdev_unregistering_cont(void *ctx)
1637 {
1638 	struct raid_bdev *raid_bdev = ctx;
1639 
1640 	spdk_bdev_close(raid_bdev->self_desc);
1641 	raid_bdev->self_desc = NULL;
1642 }
1643 
1644 static void
1645 raid_bdev_unregistering_cont(void *ctx)
1646 {
1647 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1648 }
1649 
1650 static int
1651 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1652 {
1653 	struct raid_process_finish_action *finish_action;
1654 
1655 	assert(spdk_get_thread() == process->thread);
1656 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1657 
1658 	finish_action = calloc(1, sizeof(*finish_action));
1659 	if (finish_action == NULL) {
1660 		return -ENOMEM;
1661 	}
1662 
1663 	finish_action->cb = cb;
1664 	finish_action->cb_ctx = cb_ctx;
1665 
1666 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1667 
1668 	return 0;
1669 }
1670 
1671 static void
1672 raid_bdev_unregistering_stop_process(void *ctx)
1673 {
1674 	struct raid_bdev_process *process = ctx;
1675 	struct raid_bdev *raid_bdev = process->raid_bdev;
1676 	int rc;
1677 
1678 	process->state = RAID_PROCESS_STATE_STOPPING;
1679 	if (process->status == 0) {
1680 		process->status = -ECANCELED;
1681 	}
1682 
1683 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1684 	if (rc != 0) {
1685 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1686 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1687 	}
1688 }
1689 
1690 static void
1691 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1692 {
1693 	struct raid_bdev *raid_bdev = event_ctx;
1694 
1695 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1696 		if (raid_bdev->process != NULL) {
1697 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1698 					     raid_bdev->process);
1699 		} else {
1700 			raid_bdev_unregistering_cont(raid_bdev);
1701 		}
1702 	}
1703 }
1704 
1705 static void
1706 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1707 {
1708 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1709 	int rc;
1710 
1711 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1712 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1713 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1714 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1715 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1716 				sizeof(struct raid_bdev_io_channel),
1717 				raid_bdev_gen->name);
1718 	rc = spdk_bdev_register(raid_bdev_gen);
1719 	if (rc != 0) {
1720 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1721 			    raid_bdev_gen->name, spdk_strerror(-rc));
1722 		goto err;
1723 	}
1724 
1725 	/*
1726 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1727 	 * first. The process may still need to unquiesce a range but it will fail because the
1728 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1729 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1730 	 * so this is the only way currently to do this correctly.
1731 	 * TODO: try to handle this correctly in bdev layer instead.
1732 	 */
1733 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1734 				&raid_bdev->self_desc);
1735 	if (rc != 0) {
1736 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1737 			    raid_bdev_gen->name, spdk_strerror(-rc));
1738 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1739 		goto err;
1740 	}
1741 
1742 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1743 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1744 		      raid_bdev_gen->name, raid_bdev);
1745 	return;
1746 err:
1747 	if (raid_bdev->module->stop != NULL) {
1748 		raid_bdev->module->stop(raid_bdev);
1749 	}
1750 	spdk_io_device_unregister(raid_bdev, NULL);
1751 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1752 }
1753 
1754 static void
1755 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1756 {
1757 	if (status == 0) {
1758 		raid_bdev_configure_cont(raid_bdev);
1759 	} else {
1760 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1761 			    raid_bdev->bdev.name, spdk_strerror(-status));
1762 		if (raid_bdev->module->stop != NULL) {
1763 			raid_bdev->module->stop(raid_bdev);
1764 		}
1765 	}
1766 }
1767 
1768 /*
1769  * brief:
1770  * If raid bdev config is complete, then only register the raid bdev to
1771  * bdev layer and remove this raid bdev from configuring list and
1772  * insert the raid bdev to configured list
1773  * params:
1774  * raid_bdev - pointer to raid bdev
1775  * returns:
1776  * 0 - success
1777  * non zero - failure
1778  */
1779 static int
1780 raid_bdev_configure(struct raid_bdev *raid_bdev)
1781 {
1782 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1783 	int rc;
1784 
1785 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1786 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1787 	assert(raid_bdev->bdev.blocklen > 0);
1788 
1789 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1790 	 * internal use.
1791 	 */
1792 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1793 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1794 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1795 		return -EINVAL;
1796 	}
1797 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1798 
1799 	rc = raid_bdev->module->start(raid_bdev);
1800 	if (rc != 0) {
1801 		SPDK_ERRLOG("raid module startup callback failed\n");
1802 		return rc;
1803 	}
1804 
1805 	if (raid_bdev->superblock_enabled) {
1806 		if (raid_bdev->sb == NULL) {
1807 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1808 			if (rc == 0) {
1809 				raid_bdev_init_superblock(raid_bdev);
1810 			}
1811 		} else {
1812 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1813 			if (raid_bdev->sb->block_size != data_block_size) {
1814 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1815 				rc = -EINVAL;
1816 			}
1817 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1818 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1819 				rc = -EINVAL;
1820 			}
1821 		}
1822 
1823 		if (rc != 0) {
1824 			if (raid_bdev->module->stop != NULL) {
1825 				raid_bdev->module->stop(raid_bdev);
1826 			}
1827 			return rc;
1828 		}
1829 
1830 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1831 	} else {
1832 		raid_bdev_configure_cont(raid_bdev);
1833 	}
1834 
1835 	return 0;
1836 }
1837 
1838 /*
1839  * brief:
1840  * If raid bdev is online and registered, change the bdev state to
1841  * configuring and unregister this raid device. Queue this raid device
1842  * in configuring list
1843  * params:
1844  * raid_bdev - pointer to raid bdev
1845  * cb_fn - callback function
1846  * cb_arg - argument to callback function
1847  * returns:
1848  * none
1849  */
1850 static void
1851 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1852 		      void *cb_arg)
1853 {
1854 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1855 		if (cb_fn) {
1856 			cb_fn(cb_arg, 0);
1857 		}
1858 		return;
1859 	}
1860 
1861 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1862 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1863 
1864 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1865 }
1866 
1867 /*
1868  * brief:
1869  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1870  * params:
1871  * base_bdev - pointer to base bdev
1872  * returns:
1873  * base bdev info if found, otherwise NULL.
1874  */
1875 static struct raid_base_bdev_info *
1876 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1877 {
1878 	struct raid_bdev *raid_bdev;
1879 	struct raid_base_bdev_info *base_info;
1880 
1881 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1882 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1883 			if (base_info->desc != NULL &&
1884 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1885 				return base_info;
1886 			}
1887 		}
1888 	}
1889 
1890 	return NULL;
1891 }
1892 
1893 static void
1894 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1895 {
1896 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1897 
1898 	assert(base_info->remove_scheduled);
1899 	base_info->remove_scheduled = false;
1900 
1901 	if (status == 0) {
1902 		raid_bdev->num_base_bdevs_operational--;
1903 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1904 			/* There is not enough base bdevs to keep the raid bdev operational. */
1905 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1906 			return;
1907 		}
1908 	}
1909 
1910 	if (base_info->remove_cb != NULL) {
1911 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1912 	}
1913 }
1914 
1915 static void
1916 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1917 {
1918 	struct raid_base_bdev_info *base_info = ctx;
1919 
1920 	if (status != 0) {
1921 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1922 			    raid_bdev->bdev.name, spdk_strerror(-status));
1923 	}
1924 
1925 	raid_bdev_remove_base_bdev_done(base_info, status);
1926 }
1927 
1928 static void
1929 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1930 {
1931 	struct raid_base_bdev_info *base_info = ctx;
1932 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1933 
1934 	if (status != 0) {
1935 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1936 			    raid_bdev->bdev.name, spdk_strerror(-status));
1937 		goto out;
1938 	}
1939 
1940 	if (raid_bdev->sb) {
1941 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1942 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1943 		uint8_t i;
1944 
1945 		for (i = 0; i < sb->base_bdevs_size; i++) {
1946 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1947 
1948 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1949 			    sb_base_bdev->slot == slot) {
1950 				/* TODO: distinguish between failure and intentional removal */
1951 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1952 
1953 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1954 				return;
1955 			}
1956 		}
1957 	}
1958 out:
1959 	raid_bdev_remove_base_bdev_done(base_info, status);
1960 }
1961 
1962 static void
1963 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1964 {
1965 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1966 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1967 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1968 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1969 
1970 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1971 
1972 	if (raid_ch->base_channel[idx] != NULL) {
1973 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1974 		raid_ch->base_channel[idx] = NULL;
1975 	}
1976 
1977 	if (raid_ch->process.ch_processed != NULL) {
1978 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1979 	}
1980 
1981 	spdk_for_each_channel_continue(i, 0);
1982 }
1983 
1984 static void
1985 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1986 {
1987 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1988 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1989 
1990 	raid_bdev_free_base_bdev_resource(base_info);
1991 
1992 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1993 			    base_info);
1994 }
1995 
1996 static void
1997 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1998 {
1999 	struct raid_base_bdev_info *base_info = ctx;
2000 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2001 
2002 	if (status != 0) {
2003 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2004 			    raid_bdev->bdev.name, spdk_strerror(-status));
2005 		raid_bdev_remove_base_bdev_done(base_info, status);
2006 		return;
2007 	}
2008 
2009 	raid_bdev_deconfigure_base_bdev(base_info);
2010 
2011 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
2012 			      raid_bdev_channels_remove_base_bdev_done);
2013 }
2014 
2015 static int
2016 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2017 {
2018 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2019 
2020 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2021 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2022 }
2023 
2024 struct raid_bdev_process_base_bdev_remove_ctx {
2025 	struct raid_bdev_process *process;
2026 	struct raid_base_bdev_info *base_info;
2027 	uint8_t num_base_bdevs_operational;
2028 };
2029 
2030 static void
2031 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2032 {
2033 	struct raid_base_bdev_info *base_info = ctx;
2034 	int ret;
2035 
2036 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2037 	if (ret != 0) {
2038 		raid_bdev_remove_base_bdev_done(base_info, ret);
2039 	}
2040 }
2041 
2042 static void
2043 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2044 {
2045 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2046 	struct raid_base_bdev_info *base_info = ctx->base_info;
2047 
2048 	free(ctx);
2049 
2050 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2051 			     base_info);
2052 }
2053 
2054 static void
2055 _raid_bdev_process_base_bdev_remove(void *_ctx)
2056 {
2057 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2058 	struct raid_bdev_process *process = ctx->process;
2059 	int ret;
2060 
2061 	if (ctx->base_info != process->target &&
2062 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2063 		/* process doesn't need to be stopped */
2064 		raid_bdev_process_base_bdev_remove_cont(ctx);
2065 		return;
2066 	}
2067 
2068 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2069 	       process->state < RAID_PROCESS_STATE_STOPPED);
2070 
2071 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2072 	if (ret != 0) {
2073 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2074 		free(ctx);
2075 		return;
2076 	}
2077 
2078 	process->state = RAID_PROCESS_STATE_STOPPING;
2079 
2080 	if (process->status == 0) {
2081 		process->status = -ENODEV;
2082 	}
2083 }
2084 
2085 static int
2086 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2087 				   struct raid_base_bdev_info *base_info)
2088 {
2089 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2090 
2091 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2092 
2093 	ctx = calloc(1, sizeof(*ctx));
2094 	if (ctx == NULL) {
2095 		return -ENOMEM;
2096 	}
2097 
2098 	/*
2099 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2100 	 * because the process thread should not access raid_bdev's properties. Particularly,
2101 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2102 	 * will still be valid until the process is fully stopped.
2103 	 */
2104 	ctx->base_info = base_info;
2105 	ctx->process = process;
2106 	/*
2107 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2108 	 * after the removal and more than one base bdev may be removed at the same time
2109 	 */
2110 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2111 		if (base_info->is_configured && !base_info->remove_scheduled) {
2112 			ctx->num_base_bdevs_operational++;
2113 		}
2114 	}
2115 
2116 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2117 
2118 	return 0;
2119 }
2120 
2121 static int
2122 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2123 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2124 {
2125 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2126 	int ret = 0;
2127 
2128 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2129 
2130 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2131 
2132 	if (base_info->remove_scheduled || !base_info->is_configured) {
2133 		return -ENODEV;
2134 	}
2135 
2136 	assert(base_info->desc);
2137 	base_info->remove_scheduled = true;
2138 
2139 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2140 		/*
2141 		 * As raid bdev is not registered yet or already unregistered,
2142 		 * so cleanup should be done here itself.
2143 		 *
2144 		 * Removing a base bdev at this stage does not change the number of operational
2145 		 * base bdevs, only the number of discovered base bdevs.
2146 		 */
2147 		raid_bdev_free_base_bdev_resource(base_info);
2148 		base_info->remove_scheduled = false;
2149 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2150 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2151 			/* There is no base bdev for this raid, so free the raid device. */
2152 			raid_bdev_cleanup_and_free(raid_bdev);
2153 		}
2154 		if (cb_fn != NULL) {
2155 			cb_fn(cb_ctx, 0);
2156 		}
2157 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2158 		/* This raid bdev does not tolerate removing a base bdev. */
2159 		raid_bdev->num_base_bdevs_operational--;
2160 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2161 	} else {
2162 		base_info->remove_cb = cb_fn;
2163 		base_info->remove_cb_ctx = cb_ctx;
2164 
2165 		if (raid_bdev->process != NULL) {
2166 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2167 		} else {
2168 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2169 		}
2170 
2171 		if (ret != 0) {
2172 			base_info->remove_scheduled = false;
2173 		}
2174 	}
2175 
2176 	return ret;
2177 }
2178 
2179 /*
2180  * brief:
2181  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2182  * is removed. This function checks if this base bdev is part of any raid bdev
2183  * or not. If yes, it takes necessary action on that particular raid bdev.
2184  * params:
2185  * base_bdev - pointer to base bdev which got removed
2186  * cb_fn - callback function
2187  * cb_arg - argument to callback function
2188  * returns:
2189  * 0 - success
2190  * non zero - failure
2191  */
2192 int
2193 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2194 {
2195 	struct raid_base_bdev_info *base_info;
2196 
2197 	/* Find the raid_bdev which has claimed this base_bdev */
2198 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2199 	if (!base_info) {
2200 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2201 		return -ENODEV;
2202 	}
2203 
2204 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2205 }
2206 
2207 static void
2208 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2209 {
2210 	if (status != 0) {
2211 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2212 			    raid_bdev->bdev.name, spdk_strerror(-status));
2213 	}
2214 }
2215 
2216 /*
2217  * brief:
2218  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2219  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2220  * If yes, call module handler to resize the raid_bdev if implemented.
2221  * params:
2222  * base_bdev - pointer to base bdev which got resized.
2223  * returns:
2224  * none
2225  */
2226 static void
2227 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2228 {
2229 	struct raid_bdev *raid_bdev;
2230 	struct raid_base_bdev_info *base_info;
2231 	uint64_t blockcnt_old;
2232 
2233 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2234 
2235 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2236 
2237 	/* Find the raid_bdev which has claimed this base_bdev */
2238 	if (!base_info) {
2239 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2240 		return;
2241 	}
2242 	raid_bdev = base_info->raid_bdev;
2243 
2244 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2245 
2246 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2247 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2248 
2249 	base_info->blockcnt = base_bdev->blockcnt;
2250 
2251 	if (!raid_bdev->module->resize) {
2252 		return;
2253 	}
2254 
2255 	blockcnt_old = raid_bdev->bdev.blockcnt;
2256 	if (raid_bdev->module->resize(raid_bdev) == false) {
2257 		return;
2258 	}
2259 
2260 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2261 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2262 
2263 	if (raid_bdev->superblock_enabled) {
2264 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2265 		uint8_t i;
2266 
2267 		for (i = 0; i < sb->base_bdevs_size; i++) {
2268 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2269 
2270 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2271 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2272 				sb_base_bdev->data_size = base_info->data_size;
2273 			}
2274 		}
2275 		sb->raid_size = raid_bdev->bdev.blockcnt;
2276 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2277 	}
2278 }
2279 
2280 /*
2281  * brief:
2282  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2283  * triggers asynchronous event.
2284  * params:
2285  * type - event details.
2286  * bdev - bdev that triggered event.
2287  * event_ctx - context for event.
2288  * returns:
2289  * none
2290  */
2291 static void
2292 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2293 			  void *event_ctx)
2294 {
2295 	int rc;
2296 
2297 	switch (type) {
2298 	case SPDK_BDEV_EVENT_REMOVE:
2299 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2300 		if (rc != 0) {
2301 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2302 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2303 		}
2304 		break;
2305 	case SPDK_BDEV_EVENT_RESIZE:
2306 		raid_bdev_resize_base_bdev(bdev);
2307 		break;
2308 	default:
2309 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2310 		break;
2311 	}
2312 }
2313 
2314 /*
2315  * brief:
2316  * Deletes the specified raid bdev
2317  * params:
2318  * raid_bdev - pointer to raid bdev
2319  * cb_fn - callback function
2320  * cb_arg - argument to callback function
2321  */
2322 void
2323 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2324 {
2325 	struct raid_base_bdev_info *base_info;
2326 
2327 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2328 
2329 	if (raid_bdev->destroy_started) {
2330 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2331 			      raid_bdev->bdev.name);
2332 		if (cb_fn) {
2333 			cb_fn(cb_arg, -EALREADY);
2334 		}
2335 		return;
2336 	}
2337 
2338 	raid_bdev->destroy_started = true;
2339 
2340 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2341 		base_info->remove_scheduled = true;
2342 
2343 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2344 			/*
2345 			 * As raid bdev is not registered yet or already unregistered,
2346 			 * so cleanup should be done here itself.
2347 			 */
2348 			raid_bdev_free_base_bdev_resource(base_info);
2349 		}
2350 	}
2351 
2352 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2353 		/* There is no base bdev for this raid, so free the raid device. */
2354 		raid_bdev_cleanup_and_free(raid_bdev);
2355 		if (cb_fn) {
2356 			cb_fn(cb_arg, 0);
2357 		}
2358 	} else {
2359 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2360 	}
2361 }
2362 
2363 static void
2364 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2365 {
2366 	if (status != 0) {
2367 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2368 			    raid_bdev->bdev.name, spdk_strerror(-status));
2369 	}
2370 }
2371 
2372 static void
2373 raid_bdev_process_finish_write_sb(void *ctx)
2374 {
2375 	struct raid_bdev *raid_bdev = ctx;
2376 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2377 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2378 	struct raid_base_bdev_info *base_info;
2379 	uint8_t i;
2380 
2381 	for (i = 0; i < sb->base_bdevs_size; i++) {
2382 		sb_base_bdev = &sb->base_bdevs[i];
2383 
2384 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2385 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2386 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2387 			if (base_info->is_configured) {
2388 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2389 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2390 			}
2391 		}
2392 	}
2393 
2394 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2395 }
2396 
2397 static void raid_bdev_process_free(struct raid_bdev_process *process);
2398 
2399 static void
2400 _raid_bdev_process_finish_done(void *ctx)
2401 {
2402 	struct raid_bdev_process *process = ctx;
2403 	struct raid_process_finish_action *finish_action;
2404 
2405 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2406 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2407 		finish_action->cb(finish_action->cb_ctx);
2408 		free(finish_action);
2409 	}
2410 
2411 	raid_bdev_process_free(process);
2412 
2413 	spdk_thread_exit(spdk_get_thread());
2414 }
2415 
2416 static void
2417 raid_bdev_process_finish_target_removed(void *ctx, int status)
2418 {
2419 	struct raid_bdev_process *process = ctx;
2420 
2421 	if (status != 0) {
2422 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2423 	}
2424 
2425 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2426 }
2427 
2428 static void
2429 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2430 {
2431 	struct raid_bdev_process *process = ctx;
2432 
2433 	if (status != 0) {
2434 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2435 	}
2436 
2437 	if (process->status != 0) {
2438 		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2439 						     process);
2440 		if (status != 0) {
2441 			raid_bdev_process_finish_target_removed(process, status);
2442 		}
2443 		return;
2444 	}
2445 
2446 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2447 }
2448 
2449 static void
2450 raid_bdev_process_finish_unquiesce(void *ctx)
2451 {
2452 	struct raid_bdev_process *process = ctx;
2453 	int rc;
2454 
2455 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2456 				 raid_bdev_process_finish_unquiesced, process);
2457 	if (rc != 0) {
2458 		raid_bdev_process_finish_unquiesced(process, rc);
2459 	}
2460 }
2461 
2462 static void
2463 raid_bdev_process_finish_done(void *ctx)
2464 {
2465 	struct raid_bdev_process *process = ctx;
2466 	struct raid_bdev *raid_bdev = process->raid_bdev;
2467 
2468 	if (process->raid_ch != NULL) {
2469 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2470 	}
2471 
2472 	process->state = RAID_PROCESS_STATE_STOPPED;
2473 
2474 	if (process->status == 0) {
2475 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2476 			       raid_bdev_process_to_str(process->type),
2477 			       raid_bdev->bdev.name);
2478 		if (raid_bdev->superblock_enabled) {
2479 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2480 					     raid_bdev_process_finish_write_sb,
2481 					     raid_bdev);
2482 		}
2483 	} else {
2484 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2485 			     raid_bdev_process_to_str(process->type),
2486 			     raid_bdev->bdev.name,
2487 			     spdk_strerror(-process->status));
2488 	}
2489 
2490 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2491 			     process);
2492 }
2493 
2494 static void
2495 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2496 {
2497 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2498 
2499 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2500 }
2501 
2502 static void
2503 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2504 {
2505 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2506 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2507 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2508 
2509 	if (process->status == 0) {
2510 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2511 
2512 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2513 		raid_ch->process.target_ch = NULL;
2514 	}
2515 
2516 	raid_bdev_ch_process_cleanup(raid_ch);
2517 
2518 	spdk_for_each_channel_continue(i, 0);
2519 }
2520 
2521 static void
2522 raid_bdev_process_finish_quiesced(void *ctx, int status)
2523 {
2524 	struct raid_bdev_process *process = ctx;
2525 	struct raid_bdev *raid_bdev = process->raid_bdev;
2526 
2527 	if (status != 0) {
2528 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2529 		return;
2530 	}
2531 
2532 	raid_bdev->process = NULL;
2533 	process->target->is_process_target = false;
2534 
2535 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2536 			      __raid_bdev_process_finish);
2537 }
2538 
2539 static void
2540 _raid_bdev_process_finish(void *ctx)
2541 {
2542 	struct raid_bdev_process *process = ctx;
2543 	int rc;
2544 
2545 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2546 			       raid_bdev_process_finish_quiesced, process);
2547 	if (rc != 0) {
2548 		raid_bdev_process_finish_quiesced(ctx, rc);
2549 	}
2550 }
2551 
2552 static void
2553 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2554 {
2555 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2556 }
2557 
2558 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2559 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2560 
2561 static void
2562 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2563 {
2564 	assert(spdk_get_thread() == process->thread);
2565 
2566 	if (process->status == 0) {
2567 		process->status = status;
2568 	}
2569 
2570 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2571 		return;
2572 	}
2573 
2574 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2575 	process->state = RAID_PROCESS_STATE_STOPPING;
2576 
2577 	if (process->window_range_locked) {
2578 		raid_bdev_process_unlock_window_range(process);
2579 	} else {
2580 		raid_bdev_process_thread_run(process);
2581 	}
2582 }
2583 
2584 static void
2585 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2586 {
2587 	struct raid_bdev_process *process = ctx;
2588 
2589 	if (status != 0) {
2590 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2591 		raid_bdev_process_finish(process, status);
2592 		return;
2593 	}
2594 
2595 	process->window_range_locked = false;
2596 	process->window_offset += process->window_size;
2597 
2598 	raid_bdev_process_thread_run(process);
2599 }
2600 
2601 static void
2602 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2603 {
2604 	int rc;
2605 
2606 	assert(process->window_range_locked == true);
2607 
2608 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2609 				       process->window_offset, process->max_window_size,
2610 				       raid_bdev_process_window_range_unlocked, process);
2611 	if (rc != 0) {
2612 		raid_bdev_process_window_range_unlocked(process, rc);
2613 	}
2614 }
2615 
2616 static void
2617 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2618 {
2619 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2620 
2621 	raid_bdev_process_unlock_window_range(process);
2622 }
2623 
2624 static void
2625 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2626 {
2627 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2628 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2629 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2630 
2631 	raid_ch->process.offset = process->window_offset + process->window_size;
2632 
2633 	spdk_for_each_channel_continue(i, 0);
2634 }
2635 
2636 void
2637 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2638 {
2639 	struct raid_bdev_process *process = process_req->process;
2640 
2641 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2642 
2643 	assert(spdk_get_thread() == process->thread);
2644 	assert(process->window_remaining >= process_req->num_blocks);
2645 
2646 	if (status != 0) {
2647 		process->window_status = status;
2648 	}
2649 
2650 	process->window_remaining -= process_req->num_blocks;
2651 	if (process->window_remaining == 0) {
2652 		if (process->window_status != 0) {
2653 			raid_bdev_process_finish(process, process->window_status);
2654 			return;
2655 		}
2656 
2657 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2658 				      raid_bdev_process_channels_update_done);
2659 	}
2660 }
2661 
2662 static int
2663 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2664 				 uint32_t num_blocks)
2665 {
2666 	struct raid_bdev *raid_bdev = process->raid_bdev;
2667 	struct raid_bdev_process_request *process_req;
2668 	int ret;
2669 
2670 	process_req = TAILQ_FIRST(&process->requests);
2671 	if (process_req == NULL) {
2672 		assert(process->window_remaining > 0);
2673 		return 0;
2674 	}
2675 
2676 	process_req->target = process->target;
2677 	process_req->target_ch = process->raid_ch->process.target_ch;
2678 	process_req->offset_blocks = offset_blocks;
2679 	process_req->num_blocks = num_blocks;
2680 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2681 
2682 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2683 	if (ret <= 0) {
2684 		if (ret < 0) {
2685 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2686 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2687 			process->window_status = ret;
2688 		}
2689 		return ret;
2690 	}
2691 
2692 	process_req->num_blocks = ret;
2693 	TAILQ_REMOVE(&process->requests, process_req, link);
2694 
2695 	return ret;
2696 }
2697 
2698 static void
2699 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2700 {
2701 	struct raid_bdev *raid_bdev = process->raid_bdev;
2702 	uint64_t offset = process->window_offset;
2703 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2704 	int ret;
2705 
2706 	while (offset < offset_end) {
2707 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2708 		if (ret <= 0) {
2709 			break;
2710 		}
2711 
2712 		process->window_remaining += ret;
2713 		offset += ret;
2714 	}
2715 
2716 	if (process->window_remaining > 0) {
2717 		process->window_size = process->window_remaining;
2718 	} else {
2719 		raid_bdev_process_finish(process, process->window_status);
2720 	}
2721 }
2722 
2723 static void
2724 raid_bdev_process_window_range_locked(void *ctx, int status)
2725 {
2726 	struct raid_bdev_process *process = ctx;
2727 
2728 	if (status != 0) {
2729 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2730 		raid_bdev_process_finish(process, status);
2731 		return;
2732 	}
2733 
2734 	process->window_range_locked = true;
2735 
2736 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2737 		raid_bdev_process_unlock_window_range(process);
2738 		return;
2739 	}
2740 
2741 	_raid_bdev_process_thread_run(process);
2742 }
2743 
2744 static void
2745 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2746 {
2747 	struct raid_bdev *raid_bdev = process->raid_bdev;
2748 	int rc;
2749 
2750 	assert(spdk_get_thread() == process->thread);
2751 	assert(process->window_remaining == 0);
2752 	assert(process->window_range_locked == false);
2753 
2754 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2755 		raid_bdev_process_do_finish(process);
2756 		return;
2757 	}
2758 
2759 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2760 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2761 		raid_bdev_process_finish(process, 0);
2762 		return;
2763 	}
2764 
2765 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2766 					    process->max_window_size);
2767 
2768 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2769 				     process->window_offset, process->max_window_size,
2770 				     raid_bdev_process_window_range_locked, process);
2771 	if (rc != 0) {
2772 		raid_bdev_process_window_range_locked(process, rc);
2773 	}
2774 }
2775 
2776 static void
2777 raid_bdev_process_thread_init(void *ctx)
2778 {
2779 	struct raid_bdev_process *process = ctx;
2780 	struct raid_bdev *raid_bdev = process->raid_bdev;
2781 	struct spdk_io_channel *ch;
2782 
2783 	process->thread = spdk_get_thread();
2784 
2785 	ch = spdk_get_io_channel(raid_bdev);
2786 	if (ch == NULL) {
2787 		process->status = -ENOMEM;
2788 		raid_bdev_process_do_finish(process);
2789 		return;
2790 	}
2791 
2792 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2793 	process->state = RAID_PROCESS_STATE_RUNNING;
2794 
2795 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2796 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2797 
2798 	raid_bdev_process_thread_run(process);
2799 }
2800 
2801 static void
2802 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2803 {
2804 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2805 
2806 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2807 	raid_bdev_process_free(process);
2808 
2809 	/* TODO: update sb */
2810 }
2811 
2812 static void
2813 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2814 {
2815 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2816 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2817 
2818 	raid_bdev_ch_process_cleanup(raid_ch);
2819 
2820 	spdk_for_each_channel_continue(i, 0);
2821 }
2822 
2823 static void
2824 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2825 {
2826 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2827 	struct raid_bdev *raid_bdev = process->raid_bdev;
2828 	struct spdk_thread *thread;
2829 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2830 
2831 	if (status == 0 &&
2832 	    (process->target->remove_scheduled || !process->target->is_configured ||
2833 	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2834 		/* a base bdev was removed before we got here */
2835 		status = -ENODEV;
2836 	}
2837 
2838 	if (status != 0) {
2839 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2840 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2841 			    spdk_strerror(-status));
2842 		goto err;
2843 	}
2844 
2845 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2846 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2847 
2848 	thread = spdk_thread_create(thread_name, NULL);
2849 	if (thread == NULL) {
2850 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2851 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2852 		goto err;
2853 	}
2854 
2855 	raid_bdev->process = process;
2856 
2857 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2858 
2859 	return;
2860 err:
2861 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2862 			      raid_bdev_channels_abort_start_process_done);
2863 }
2864 
2865 static void
2866 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2867 {
2868 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2869 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2870 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2871 	int rc;
2872 
2873 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2874 
2875 	spdk_for_each_channel_continue(i, rc);
2876 }
2877 
2878 static void
2879 raid_bdev_process_start(struct raid_bdev_process *process)
2880 {
2881 	struct raid_bdev *raid_bdev = process->raid_bdev;
2882 
2883 	assert(raid_bdev->module->submit_process_request != NULL);
2884 
2885 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2886 			      raid_bdev_channels_start_process_done);
2887 }
2888 
2889 static void
2890 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2891 {
2892 	spdk_dma_free(process_req->iov.iov_base);
2893 	spdk_dma_free(process_req->md_buf);
2894 	free(process_req);
2895 }
2896 
2897 static struct raid_bdev_process_request *
2898 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2899 {
2900 	struct raid_bdev *raid_bdev = process->raid_bdev;
2901 	struct raid_bdev_process_request *process_req;
2902 
2903 	process_req = calloc(1, sizeof(*process_req));
2904 	if (process_req == NULL) {
2905 		return NULL;
2906 	}
2907 
2908 	process_req->process = process;
2909 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2910 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2911 	if (process_req->iov.iov_base == NULL) {
2912 		free(process_req);
2913 		return NULL;
2914 	}
2915 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2916 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2917 		if (process_req->md_buf == NULL) {
2918 			raid_bdev_process_request_free(process_req);
2919 			return NULL;
2920 		}
2921 	}
2922 
2923 	return process_req;
2924 }
2925 
2926 static void
2927 raid_bdev_process_free(struct raid_bdev_process *process)
2928 {
2929 	struct raid_bdev_process_request *process_req;
2930 
2931 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2932 		TAILQ_REMOVE(&process->requests, process_req, link);
2933 		raid_bdev_process_request_free(process_req);
2934 	}
2935 
2936 	free(process);
2937 }
2938 
2939 static struct raid_bdev_process *
2940 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2941 			struct raid_base_bdev_info *target)
2942 {
2943 	struct raid_bdev_process *process;
2944 	struct raid_bdev_process_request *process_req;
2945 	int i;
2946 
2947 	process = calloc(1, sizeof(*process));
2948 	if (process == NULL) {
2949 		return NULL;
2950 	}
2951 
2952 	process->raid_bdev = raid_bdev;
2953 	process->type = type;
2954 	process->target = target;
2955 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2956 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
2957 					    raid_bdev->bdev.write_unit_size);
2958 	TAILQ_INIT(&process->requests);
2959 	TAILQ_INIT(&process->finish_actions);
2960 
2961 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2962 		process_req = raid_bdev_process_alloc_request(process);
2963 		if (process_req == NULL) {
2964 			raid_bdev_process_free(process);
2965 			return NULL;
2966 		}
2967 
2968 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2969 	}
2970 
2971 	return process;
2972 }
2973 
2974 static int
2975 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2976 {
2977 	struct raid_bdev_process *process;
2978 
2979 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2980 
2981 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2982 	if (process == NULL) {
2983 		return -ENOMEM;
2984 	}
2985 
2986 	raid_bdev_process_start(process);
2987 
2988 	return 0;
2989 }
2990 
2991 static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
2992 
2993 static void
2994 _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
2995 {
2996 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
2997 
2998 	raid_bdev_configure_base_bdev_cont(base_info);
2999 }
3000 
3001 static void
3002 raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3003 {
3004 	spdk_for_each_channel_continue(i, 0);
3005 }
3006 
3007 static void
3008 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3009 {
3010 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3011 	int rc;
3012 
3013 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3014 	    base_info->is_process_target == false) {
3015 		/* TODO: defer if rebuild in progress on another base bdev */
3016 		assert(raid_bdev->process == NULL);
3017 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3018 		base_info->is_process_target = true;
3019 		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3020 		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3021 		return;
3022 	}
3023 
3024 	base_info->is_configured = true;
3025 
3026 	raid_bdev->num_base_bdevs_discovered++;
3027 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3028 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3029 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3030 
3031 	/*
3032 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3033 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3034 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3035 	 * degraded.
3036 	 */
3037 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3038 		rc = raid_bdev_configure(raid_bdev);
3039 		if (rc != 0) {
3040 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3041 		}
3042 	} else if (base_info->is_process_target) {
3043 		raid_bdev->num_base_bdevs_operational++;
3044 		rc = raid_bdev_start_rebuild(base_info);
3045 		if (rc != 0) {
3046 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3047 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3048 		}
3049 	} else {
3050 		rc = 0;
3051 	}
3052 
3053 	if (base_info->configure_cb != NULL) {
3054 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
3055 	}
3056 }
3057 
3058 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3059 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3060 
3061 static void
3062 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3063 		void *ctx)
3064 {
3065 	struct raid_base_bdev_info *base_info = ctx;
3066 
3067 	switch (status) {
3068 	case 0:
3069 		/* valid superblock found */
3070 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3071 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3072 
3073 			raid_bdev_free_base_bdev_resource(base_info);
3074 			raid_bdev_examine_sb(sb, bdev, base_info->configure_cb, base_info->configure_cb_ctx);
3075 			return;
3076 		}
3077 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3078 		status = -EEXIST;
3079 		raid_bdev_free_base_bdev_resource(base_info);
3080 		break;
3081 	case -EINVAL:
3082 		/* no valid superblock */
3083 		raid_bdev_configure_base_bdev_cont(base_info);
3084 		return;
3085 	default:
3086 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3087 			    base_info->name, spdk_strerror(-status));
3088 		break;
3089 	}
3090 
3091 	if (base_info->configure_cb != NULL) {
3092 		base_info->configure_cb(base_info->configure_cb_ctx, status);
3093 	}
3094 }
3095 
3096 static int
3097 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3098 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3099 {
3100 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3101 	struct spdk_bdev_desc *desc;
3102 	struct spdk_bdev *bdev;
3103 	const struct spdk_uuid *bdev_uuid;
3104 	int rc;
3105 
3106 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3107 	assert(base_info->desc == NULL);
3108 
3109 	/*
3110 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3111 	 * before claiming the bdev.
3112 	 */
3113 
3114 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3115 		char uuid_str[SPDK_UUID_STRING_LEN];
3116 		const char *bdev_name;
3117 
3118 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3119 
3120 		/* UUID of a bdev is registered as its alias */
3121 		bdev = spdk_bdev_get_by_name(uuid_str);
3122 		if (bdev == NULL) {
3123 			return -ENODEV;
3124 		}
3125 
3126 		bdev_name = spdk_bdev_get_name(bdev);
3127 
3128 		if (base_info->name == NULL) {
3129 			assert(existing == true);
3130 			base_info->name = strdup(bdev_name);
3131 			if (base_info->name == NULL) {
3132 				return -ENOMEM;
3133 			}
3134 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3135 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3136 				    bdev_name, base_info->name);
3137 			return -EINVAL;
3138 		}
3139 	}
3140 
3141 	assert(base_info->name != NULL);
3142 
3143 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3144 	if (rc != 0) {
3145 		if (rc != -ENODEV) {
3146 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3147 		}
3148 		return rc;
3149 	}
3150 
3151 	bdev = spdk_bdev_desc_get_bdev(desc);
3152 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3153 
3154 	if (spdk_uuid_is_null(&base_info->uuid)) {
3155 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3156 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3157 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3158 		spdk_bdev_close(desc);
3159 		return -EINVAL;
3160 	}
3161 
3162 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3163 	if (rc != 0) {
3164 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3165 		spdk_bdev_close(desc);
3166 		return rc;
3167 	}
3168 
3169 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3170 
3171 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3172 	if (base_info->app_thread_ch == NULL) {
3173 		SPDK_ERRLOG("Failed to get io channel\n");
3174 		spdk_bdev_module_release_bdev(bdev);
3175 		spdk_bdev_close(desc);
3176 		return -ENOMEM;
3177 	}
3178 
3179 	base_info->desc = desc;
3180 	base_info->blockcnt = bdev->blockcnt;
3181 
3182 	if (raid_bdev->superblock_enabled) {
3183 		uint64_t data_offset;
3184 
3185 		if (base_info->data_offset == 0) {
3186 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3187 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3188 		} else {
3189 			data_offset = base_info->data_offset;
3190 		}
3191 
3192 		if (bdev->optimal_io_boundary != 0) {
3193 			data_offset = spdk_divide_round_up(data_offset,
3194 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3195 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3196 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3197 					     base_info->data_offset, base_info->name, data_offset);
3198 				data_offset = base_info->data_offset;
3199 			}
3200 		}
3201 
3202 		base_info->data_offset = data_offset;
3203 	}
3204 
3205 	if (base_info->data_offset >= bdev->blockcnt) {
3206 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3207 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3208 		rc = -EINVAL;
3209 		goto out;
3210 	}
3211 
3212 	if (base_info->data_size == 0) {
3213 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3214 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3215 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3216 			    bdev->blockcnt, base_info->name);
3217 		rc = -EINVAL;
3218 		goto out;
3219 	}
3220 
3221 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3222 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3223 			    bdev->name);
3224 		rc = -EINVAL;
3225 		goto out;
3226 	}
3227 
3228 	/*
3229 	 * Set the raid bdev properties if this is the first base bdev configured,
3230 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3231 	 * have the same blocklen and metadata format.
3232 	 */
3233 	if (raid_bdev->bdev.blocklen == 0) {
3234 		raid_bdev->bdev.blocklen = bdev->blocklen;
3235 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3236 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3237 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3238 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3239 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3240 	} else {
3241 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3242 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3243 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3244 			rc = -EINVAL;
3245 			goto out;
3246 		}
3247 
3248 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3249 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3250 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3251 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3252 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev)) {
3253 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3254 				    raid_bdev->bdev.name, bdev->name);
3255 			rc = -EINVAL;
3256 			goto out;
3257 		}
3258 	}
3259 
3260 	base_info->configure_cb = cb_fn;
3261 	base_info->configure_cb_ctx = cb_ctx;
3262 
3263 	if (existing) {
3264 		raid_bdev_configure_base_bdev_cont(base_info);
3265 	} else {
3266 		/* check for existing superblock when using a new bdev */
3267 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3268 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3269 		if (rc) {
3270 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3271 				    bdev->name, spdk_strerror(-rc));
3272 		}
3273 	}
3274 out:
3275 	if (rc != 0) {
3276 		raid_bdev_free_base_bdev_resource(base_info);
3277 	}
3278 	return rc;
3279 }
3280 
3281 int
3282 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3283 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3284 {
3285 	struct raid_base_bdev_info *base_info = NULL, *iter;
3286 	int rc;
3287 
3288 	assert(name != NULL);
3289 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3290 
3291 	if (raid_bdev->process != NULL) {
3292 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3293 			    raid_bdev->bdev.name);
3294 		return -EPERM;
3295 	}
3296 
3297 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3298 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3299 
3300 		if (bdev != NULL) {
3301 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3302 				if (iter->name == NULL &&
3303 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3304 					base_info = iter;
3305 					break;
3306 				}
3307 			}
3308 		}
3309 	}
3310 
3311 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3312 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3313 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3314 				base_info = iter;
3315 				break;
3316 			}
3317 		}
3318 	}
3319 
3320 	if (base_info == NULL) {
3321 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3322 			    raid_bdev->bdev.name, name);
3323 		return -EINVAL;
3324 	}
3325 
3326 	assert(base_info->is_configured == false);
3327 
3328 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3329 		assert(base_info->data_size != 0);
3330 		assert(base_info->desc == NULL);
3331 	}
3332 
3333 	base_info->name = strdup(name);
3334 	if (base_info->name == NULL) {
3335 		return -ENOMEM;
3336 	}
3337 
3338 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3339 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3340 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3341 		free(base_info->name);
3342 		base_info->name = NULL;
3343 	}
3344 
3345 	return rc;
3346 }
3347 
3348 static int
3349 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3350 {
3351 	struct raid_bdev *raid_bdev;
3352 	uint8_t i;
3353 	int rc;
3354 
3355 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3356 			       sb->level, true, &sb->uuid, &raid_bdev);
3357 	if (rc != 0) {
3358 		return rc;
3359 	}
3360 
3361 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3362 	if (rc != 0) {
3363 		raid_bdev_free(raid_bdev);
3364 		return rc;
3365 	}
3366 
3367 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3368 	memcpy(raid_bdev->sb, sb, sb->length);
3369 
3370 	for (i = 0; i < sb->base_bdevs_size; i++) {
3371 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3372 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3373 
3374 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3375 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3376 			raid_bdev->num_base_bdevs_operational++;
3377 		}
3378 
3379 		base_info->data_offset = sb_base_bdev->data_offset;
3380 		base_info->data_size = sb_base_bdev->data_size;
3381 	}
3382 
3383 	*raid_bdev_out = raid_bdev;
3384 	return 0;
3385 }
3386 
3387 static void
3388 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3389 {
3390 	struct raid_bdev *raid_bdev;
3391 	struct raid_base_bdev_info *base_info;
3392 
3393 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3394 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3395 			continue;
3396 		}
3397 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3398 			if (base_info->desc == NULL &&
3399 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3400 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3401 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3402 				break;
3403 			}
3404 		}
3405 	}
3406 }
3407 
3408 struct raid_bdev_examine_others_ctx {
3409 	struct spdk_uuid raid_bdev_uuid;
3410 	uint8_t current_base_bdev_idx;
3411 	raid_base_bdev_cb cb_fn;
3412 	void *cb_ctx;
3413 };
3414 
3415 static void
3416 raid_bdev_examine_others_done(void *_ctx, int status)
3417 {
3418 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3419 
3420 	if (ctx->cb_fn != NULL) {
3421 		ctx->cb_fn(ctx->cb_ctx, status);
3422 	}
3423 	free(ctx);
3424 }
3425 
3426 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3427 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3428 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3429 				     void *cb_ctx);
3430 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3431 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3432 static void raid_bdev_examine_others(void *_ctx, int status);
3433 
3434 static void
3435 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3436 				 int status, void *_ctx)
3437 {
3438 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3439 
3440 	if (status != 0) {
3441 		raid_bdev_examine_others_done(ctx, status);
3442 		return;
3443 	}
3444 
3445 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3446 }
3447 
3448 static void
3449 raid_bdev_examine_others(void *_ctx, int status)
3450 {
3451 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3452 	struct raid_bdev *raid_bdev;
3453 	struct raid_base_bdev_info *base_info;
3454 	char uuid_str[SPDK_UUID_STRING_LEN];
3455 
3456 	if (status != 0) {
3457 		goto out;
3458 	}
3459 
3460 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3461 	if (raid_bdev == NULL) {
3462 		status = -ENODEV;
3463 		goto out;
3464 	}
3465 
3466 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3467 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3468 	     base_info++) {
3469 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3470 			continue;
3471 		}
3472 
3473 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3474 
3475 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3476 			continue;
3477 		}
3478 
3479 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3480 
3481 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3482 		if (status != 0) {
3483 			continue;
3484 		}
3485 		return;
3486 	}
3487 out:
3488 	raid_bdev_examine_others_done(ctx, status);
3489 }
3490 
3491 static void
3492 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3493 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3494 {
3495 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3496 	struct raid_bdev *raid_bdev;
3497 	struct raid_base_bdev_info *iter, *base_info;
3498 	uint8_t i;
3499 	int rc;
3500 
3501 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3502 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3503 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3504 		rc = -EINVAL;
3505 		goto out;
3506 	}
3507 
3508 	if (spdk_uuid_is_null(&sb->uuid)) {
3509 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3510 		rc = -EINVAL;
3511 		goto out;
3512 	}
3513 
3514 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3515 
3516 	if (raid_bdev) {
3517 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3518 			SPDK_DEBUGLOG(bdev_raid,
3519 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3520 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3521 
3522 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3523 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3524 					     raid_bdev->bdev.name, bdev->name);
3525 				rc = -EBUSY;
3526 				goto out;
3527 			}
3528 
3529 			/* remove and then recreate the raid bdev using the newer superblock */
3530 			raid_bdev_delete(raid_bdev, NULL, NULL);
3531 			raid_bdev = NULL;
3532 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3533 			SPDK_DEBUGLOG(bdev_raid,
3534 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3535 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3536 			/* use the current raid bdev superblock */
3537 			sb = raid_bdev->sb;
3538 		}
3539 	}
3540 
3541 	for (i = 0; i < sb->base_bdevs_size; i++) {
3542 		sb_base_bdev = &sb->base_bdevs[i];
3543 
3544 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3545 
3546 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3547 			break;
3548 		}
3549 	}
3550 
3551 	if (i == sb->base_bdevs_size) {
3552 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3553 		rc = -EINVAL;
3554 		goto out;
3555 	}
3556 
3557 	if (!raid_bdev) {
3558 		struct raid_bdev_examine_others_ctx *ctx;
3559 
3560 		ctx = calloc(1, sizeof(*ctx));
3561 		if (ctx == NULL) {
3562 			rc = -ENOMEM;
3563 			goto out;
3564 		}
3565 
3566 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3567 		if (rc != 0) {
3568 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3569 				    sb->name, spdk_strerror(-rc));
3570 			free(ctx);
3571 			goto out;
3572 		}
3573 
3574 		/* after this base bdev is configured, examine other base bdevs that may be present */
3575 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3576 		ctx->cb_fn = cb_fn;
3577 		ctx->cb_ctx = cb_ctx;
3578 
3579 		cb_fn = raid_bdev_examine_others;
3580 		cb_ctx = ctx;
3581 	}
3582 
3583 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3584 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3585 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3586 		assert(base_info->is_configured == false);
3587 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3588 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3589 		assert(spdk_uuid_is_null(&base_info->uuid));
3590 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3591 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3592 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3593 		if (rc != 0) {
3594 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3595 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3596 		}
3597 		goto out;
3598 	}
3599 
3600 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3601 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3602 			       bdev->name, raid_bdev->bdev.name);
3603 		rc = -EINVAL;
3604 		goto out;
3605 	}
3606 
3607 	base_info = NULL;
3608 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3609 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3610 			base_info = iter;
3611 			break;
3612 		}
3613 	}
3614 
3615 	if (base_info == NULL) {
3616 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3617 			    bdev->name, raid_bdev->bdev.name);
3618 		rc = -EINVAL;
3619 		goto out;
3620 	}
3621 
3622 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3623 	if (rc != 0) {
3624 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3625 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3626 	}
3627 out:
3628 	if (rc != 0 && cb_fn != 0) {
3629 		cb_fn(cb_ctx, rc);
3630 	}
3631 }
3632 
3633 struct raid_bdev_examine_ctx {
3634 	struct spdk_bdev_desc *desc;
3635 	struct spdk_io_channel *ch;
3636 	raid_bdev_examine_load_sb_cb cb;
3637 	void *cb_ctx;
3638 };
3639 
3640 static void
3641 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3642 {
3643 	if (!ctx) {
3644 		return;
3645 	}
3646 
3647 	if (ctx->ch) {
3648 		spdk_put_io_channel(ctx->ch);
3649 	}
3650 
3651 	if (ctx->desc) {
3652 		spdk_bdev_close(ctx->desc);
3653 	}
3654 
3655 	free(ctx);
3656 }
3657 
3658 static void
3659 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3660 {
3661 	struct raid_bdev_examine_ctx *ctx = _ctx;
3662 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3663 
3664 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3665 
3666 	raid_bdev_examine_ctx_free(ctx);
3667 }
3668 
3669 static void
3670 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3671 {
3672 }
3673 
3674 static int
3675 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3676 {
3677 	struct raid_bdev_examine_ctx *ctx;
3678 	int rc;
3679 
3680 	assert(cb != NULL);
3681 
3682 	ctx = calloc(1, sizeof(*ctx));
3683 	if (!ctx) {
3684 		return -ENOMEM;
3685 	}
3686 
3687 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3688 	if (rc) {
3689 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3690 		goto err;
3691 	}
3692 
3693 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3694 	if (!ctx->ch) {
3695 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3696 		rc = -ENOMEM;
3697 		goto err;
3698 	}
3699 
3700 	ctx->cb = cb;
3701 	ctx->cb_ctx = cb_ctx;
3702 
3703 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3704 	if (rc) {
3705 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3706 			    bdev_name, spdk_strerror(-rc));
3707 		goto err;
3708 	}
3709 
3710 	return 0;
3711 err:
3712 	raid_bdev_examine_ctx_free(ctx);
3713 	return rc;
3714 }
3715 
3716 static void
3717 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3718 		       void *ctx)
3719 {
3720 	switch (status) {
3721 	case 0:
3722 		/* valid superblock found */
3723 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3724 		raid_bdev_examine_sb(sb, bdev, NULL, NULL);
3725 		break;
3726 	case -EINVAL:
3727 		/* no valid superblock, check if it can be claimed anyway */
3728 		raid_bdev_examine_no_sb(bdev);
3729 		break;
3730 	default:
3731 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3732 			    bdev->name, spdk_strerror(-status));
3733 		break;
3734 	}
3735 
3736 	spdk_bdev_module_examine_done(&g_raid_if);
3737 }
3738 
3739 /*
3740  * brief:
3741  * raid_bdev_examine function is the examine function call by the below layers
3742  * like bdev_nvme layer. This function will check if this base bdev can be
3743  * claimed by this raid bdev or not.
3744  * params:
3745  * bdev - pointer to base bdev
3746  * returns:
3747  * none
3748  */
3749 static void
3750 raid_bdev_examine(struct spdk_bdev *bdev)
3751 {
3752 	int rc;
3753 
3754 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3755 		goto done;
3756 	}
3757 
3758 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3759 		raid_bdev_examine_no_sb(bdev);
3760 		goto done;
3761 	}
3762 
3763 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3764 	if (rc != 0) {
3765 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3766 			    bdev->name, spdk_strerror(-rc));
3767 		goto done;
3768 	}
3769 
3770 	return;
3771 done:
3772 	spdk_bdev_module_examine_done(&g_raid_if);
3773 }
3774 
3775 /* Log component for bdev raid bdev module */
3776 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3777