xref: /spdk/module/bdev/raid/bdev_raid.c (revision 33712560bf41a135559d7731fd55583c645ca714)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT	1024
20 #define RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT	0
21 
22 static bool g_shutdown_started = false;
23 
24 /* List of all raid bdevs */
25 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
26 
27 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
28 
29 /*
30  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
31  * contains the relationship of raid bdev io channel with base bdev io channels.
32  */
33 struct raid_bdev_io_channel {
34 	/* Array of IO channels of base bdevs */
35 	struct spdk_io_channel	**base_channel;
36 
37 	/* Private raid module IO channel */
38 	struct spdk_io_channel	*module_channel;
39 
40 	/* Background process data */
41 	struct {
42 		uint64_t offset;
43 		struct spdk_io_channel *target_ch;
44 		struct raid_bdev_io_channel *ch_processed;
45 	} process;
46 };
47 
48 enum raid_bdev_process_state {
49 	RAID_PROCESS_STATE_INIT,
50 	RAID_PROCESS_STATE_RUNNING,
51 	RAID_PROCESS_STATE_STOPPING,
52 	RAID_PROCESS_STATE_STOPPED,
53 };
54 
55 struct raid_process_qos {
56 	bool enable_qos;
57 	uint64_t last_tsc;
58 	double bytes_per_tsc;
59 	double bytes_available;
60 	double bytes_max;
61 	struct spdk_poller *process_continue_poller;
62 };
63 
64 struct raid_bdev_process {
65 	struct raid_bdev		*raid_bdev;
66 	enum raid_process_type		type;
67 	enum raid_bdev_process_state	state;
68 	struct spdk_thread		*thread;
69 	struct raid_bdev_io_channel	*raid_ch;
70 	TAILQ_HEAD(, raid_bdev_process_request) requests;
71 	uint64_t			max_window_size;
72 	uint64_t			window_size;
73 	uint64_t			window_remaining;
74 	int				window_status;
75 	uint64_t			window_offset;
76 	bool				window_range_locked;
77 	struct raid_base_bdev_info	*target;
78 	int				status;
79 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
80 	struct raid_process_qos		qos;
81 };
82 
83 struct raid_process_finish_action {
84 	spdk_msg_fn cb;
85 	void *cb_ctx;
86 	TAILQ_ENTRY(raid_process_finish_action) link;
87 };
88 
89 static struct spdk_raid_bdev_opts g_opts = {
90 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
91 	.process_max_bandwidth_mb_sec = RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT,
92 };
93 
94 void
95 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
96 {
97 	*opts = g_opts;
98 }
99 
100 int
101 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
102 {
103 	if (opts->process_window_size_kb == 0) {
104 		return -EINVAL;
105 	}
106 
107 	g_opts = *opts;
108 
109 	return 0;
110 }
111 
112 static struct raid_bdev_module *
113 raid_bdev_module_find(enum raid_level level)
114 {
115 	struct raid_bdev_module *raid_module;
116 
117 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
118 		if (raid_module->level == level) {
119 			return raid_module;
120 		}
121 	}
122 
123 	return NULL;
124 }
125 
126 void
127 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
128 {
129 	if (raid_bdev_module_find(raid_module->level) != NULL) {
130 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
131 			    raid_bdev_level_to_str(raid_module->level));
132 		assert(false);
133 	} else {
134 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
135 	}
136 }
137 
138 struct spdk_io_channel *
139 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
140 {
141 	return raid_ch->base_channel[idx];
142 }
143 
144 void *
145 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
146 {
147 	assert(raid_ch->module_channel != NULL);
148 
149 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
150 }
151 
152 struct raid_base_bdev_info *
153 raid_bdev_channel_get_base_info(struct raid_bdev_io_channel *raid_ch, struct spdk_bdev *base_bdev)
154 {
155 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
156 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
157 	uint8_t i;
158 
159 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
160 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[i];
161 
162 		if (base_info->is_configured &&
163 		    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
164 			return base_info;
165 		}
166 	}
167 
168 	return NULL;
169 }
170 
171 /* Function declarations */
172 static void	raid_bdev_examine(struct spdk_bdev *bdev);
173 static int	raid_bdev_init(void);
174 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
175 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
176 
177 static void
178 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
179 {
180 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
181 
182 	if (raid_ch->process.target_ch != NULL) {
183 		spdk_put_io_channel(raid_ch->process.target_ch);
184 		raid_ch->process.target_ch = NULL;
185 	}
186 
187 	if (raid_ch->process.ch_processed != NULL) {
188 		free(raid_ch->process.ch_processed->base_channel);
189 		free(raid_ch->process.ch_processed);
190 		raid_ch->process.ch_processed = NULL;
191 	}
192 }
193 
194 static int
195 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
196 {
197 	struct raid_bdev *raid_bdev = process->raid_bdev;
198 	struct raid_bdev_io_channel *raid_ch_processed;
199 	struct raid_base_bdev_info *base_info;
200 
201 	raid_ch->process.offset = process->window_offset;
202 
203 	/* In the future we may have other types of processes which don't use a target bdev,
204 	 * like data scrubbing or strip size migration. Until then, expect that there always is
205 	 * a process target. */
206 	assert(process->target != NULL);
207 
208 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
209 	if (raid_ch->process.target_ch == NULL) {
210 		goto err;
211 	}
212 
213 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
214 	if (raid_ch_processed == NULL) {
215 		goto err;
216 	}
217 	raid_ch->process.ch_processed = raid_ch_processed;
218 
219 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
220 					  sizeof(*raid_ch_processed->base_channel));
221 	if (raid_ch_processed->base_channel == NULL) {
222 		goto err;
223 	}
224 
225 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
226 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
227 
228 		if (base_info != process->target) {
229 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
230 		} else {
231 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
232 		}
233 	}
234 
235 	raid_ch_processed->module_channel = raid_ch->module_channel;
236 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
237 
238 	return 0;
239 err:
240 	raid_bdev_ch_process_cleanup(raid_ch);
241 	return -ENOMEM;
242 }
243 
244 /*
245  * brief:
246  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
247  * hierarchy from raid bdev to base bdev io channels. It will be called per core
248  * params:
249  * io_device - pointer to raid bdev io device represented by raid_bdev
250  * ctx_buf - pointer to context buffer for raid bdev io channel
251  * returns:
252  * 0 - success
253  * non zero - failure
254  */
255 static int
256 raid_bdev_create_cb(void *io_device, void *ctx_buf)
257 {
258 	struct raid_bdev            *raid_bdev = io_device;
259 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
260 	uint8_t i;
261 	int ret = -ENOMEM;
262 
263 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
264 
265 	assert(raid_bdev != NULL);
266 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
267 
268 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
269 	if (!raid_ch->base_channel) {
270 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
271 		return -ENOMEM;
272 	}
273 
274 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
275 		/*
276 		 * Get the spdk_io_channel for all the base bdevs. This is used during
277 		 * split logic to send the respective child bdev ios to respective base
278 		 * bdev io channel.
279 		 * Skip missing base bdevs and the process target, which should also be treated as
280 		 * missing until the process completes.
281 		 */
282 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
283 		    raid_bdev->base_bdev_info[i].is_process_target == true) {
284 			continue;
285 		}
286 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
287 						   raid_bdev->base_bdev_info[i].desc);
288 		if (!raid_ch->base_channel[i]) {
289 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
290 			goto err;
291 		}
292 	}
293 
294 	if (raid_bdev->module->get_io_channel) {
295 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
296 		if (!raid_ch->module_channel) {
297 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
298 			goto err;
299 		}
300 	}
301 
302 	if (raid_bdev->process != NULL) {
303 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
304 		if (ret != 0) {
305 			SPDK_ERRLOG("Failed to setup process io channel\n");
306 			goto err;
307 		}
308 	} else {
309 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
310 	}
311 
312 	return 0;
313 err:
314 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
315 		if (raid_ch->base_channel[i] != NULL) {
316 			spdk_put_io_channel(raid_ch->base_channel[i]);
317 		}
318 	}
319 	free(raid_ch->base_channel);
320 
321 	raid_bdev_ch_process_cleanup(raid_ch);
322 
323 	return ret;
324 }
325 
326 /*
327  * brief:
328  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
329  * hierarchy from raid bdev to base bdev io channels. It will be called per core
330  * params:
331  * io_device - pointer to raid bdev io device represented by raid_bdev
332  * ctx_buf - pointer to context buffer for raid bdev io channel
333  * returns:
334  * none
335  */
336 static void
337 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
338 {
339 	struct raid_bdev *raid_bdev = io_device;
340 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
341 	uint8_t i;
342 
343 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
344 
345 	assert(raid_ch != NULL);
346 	assert(raid_ch->base_channel);
347 
348 	if (raid_ch->module_channel) {
349 		spdk_put_io_channel(raid_ch->module_channel);
350 	}
351 
352 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
353 		/* Free base bdev channels */
354 		if (raid_ch->base_channel[i] != NULL) {
355 			spdk_put_io_channel(raid_ch->base_channel[i]);
356 		}
357 	}
358 	free(raid_ch->base_channel);
359 	raid_ch->base_channel = NULL;
360 
361 	raid_bdev_ch_process_cleanup(raid_ch);
362 }
363 
364 /*
365  * brief:
366  * raid_bdev_cleanup is used to cleanup raid_bdev related data
367  * structures.
368  * params:
369  * raid_bdev - pointer to raid_bdev
370  * returns:
371  * none
372  */
373 static void
374 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
375 {
376 	struct raid_base_bdev_info *base_info;
377 
378 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
379 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
380 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
381 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
382 
383 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
384 		assert(base_info->desc == NULL);
385 		free(base_info->name);
386 	}
387 
388 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
389 }
390 
391 static void
392 raid_bdev_free(struct raid_bdev *raid_bdev)
393 {
394 	raid_bdev_free_superblock(raid_bdev);
395 	free(raid_bdev->base_bdev_info);
396 	free(raid_bdev->bdev.name);
397 	free(raid_bdev);
398 }
399 
400 static void
401 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
402 {
403 	raid_bdev_cleanup(raid_bdev);
404 	raid_bdev_free(raid_bdev);
405 }
406 
407 static void
408 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
409 {
410 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
411 
412 	assert(base_info->is_configured);
413 	assert(raid_bdev->num_base_bdevs_discovered);
414 	raid_bdev->num_base_bdevs_discovered--;
415 	base_info->is_configured = false;
416 	base_info->is_process_target = false;
417 }
418 
419 /*
420  * brief:
421  * free resource of base bdev for raid bdev
422  * params:
423  * base_info - raid base bdev info
424  * returns:
425  * none
426  */
427 static void
428 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
429 {
430 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
431 
432 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
433 	assert(base_info->configure_cb == NULL);
434 
435 	free(base_info->name);
436 	base_info->name = NULL;
437 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
438 		spdk_uuid_set_null(&base_info->uuid);
439 	}
440 	base_info->is_failed = false;
441 
442 	if (base_info->desc == NULL) {
443 		return;
444 	}
445 
446 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
447 	spdk_bdev_close(base_info->desc);
448 	base_info->desc = NULL;
449 	spdk_put_io_channel(base_info->app_thread_ch);
450 	base_info->app_thread_ch = NULL;
451 
452 	if (base_info->is_configured) {
453 		raid_bdev_deconfigure_base_bdev(base_info);
454 	}
455 }
456 
457 static void
458 raid_bdev_io_device_unregister_cb(void *io_device)
459 {
460 	struct raid_bdev *raid_bdev = io_device;
461 
462 	if (raid_bdev->num_base_bdevs_discovered == 0) {
463 		/* Free raid_bdev when there are no base bdevs left */
464 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
465 		raid_bdev_cleanup(raid_bdev);
466 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
467 		raid_bdev_free(raid_bdev);
468 	} else {
469 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
470 	}
471 }
472 
473 void
474 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
475 {
476 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
477 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
478 	}
479 }
480 
481 static void
482 _raid_bdev_destruct(void *ctxt)
483 {
484 	struct raid_bdev *raid_bdev = ctxt;
485 	struct raid_base_bdev_info *base_info;
486 
487 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
488 
489 	assert(raid_bdev->process == NULL);
490 
491 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
492 		/*
493 		 * Close all base bdev descriptors for which call has come from below
494 		 * layers.  Also close the descriptors if we have started shutdown.
495 		 */
496 		if (g_shutdown_started || base_info->remove_scheduled == true) {
497 			raid_bdev_free_base_bdev_resource(base_info);
498 		}
499 	}
500 
501 	if (g_shutdown_started) {
502 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
503 	}
504 
505 	if (raid_bdev->module->stop != NULL) {
506 		if (raid_bdev->module->stop(raid_bdev) == false) {
507 			return;
508 		}
509 	}
510 
511 	raid_bdev_module_stop_done(raid_bdev);
512 }
513 
514 static int
515 raid_bdev_destruct(void *ctx)
516 {
517 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
518 
519 	return 1;
520 }
521 
522 int
523 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
524 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
525 {
526 	struct spdk_dif_ctx dif_ctx;
527 	struct spdk_dif_error err_blk = {};
528 	int rc;
529 	struct spdk_dif_ctx_init_ext_opts dif_opts;
530 	struct iovec md_iov = {
531 		.iov_base	= md_buf,
532 		.iov_len	= num_blocks * bdev->md_len,
533 	};
534 
535 	if (md_buf == NULL) {
536 		return 0;
537 	}
538 
539 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
540 	dif_opts.dif_pi_format = bdev->dif_pi_format;
541 	rc = spdk_dif_ctx_init(&dif_ctx,
542 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
543 			       bdev->dif_is_head_of_md, bdev->dif_type,
544 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
545 			       0, 0, 0, 0, 0, &dif_opts);
546 	if (rc != 0) {
547 		SPDK_ERRLOG("Initialization of DIF context failed\n");
548 		return rc;
549 	}
550 
551 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
552 
553 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
554 	if (rc != 0) {
555 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
556 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
557 	}
558 
559 	return rc;
560 }
561 
562 int
563 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
564 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
565 {
566 	struct spdk_dif_ctx dif_ctx;
567 	struct spdk_dif_error err_blk = {};
568 	int rc;
569 	struct spdk_dif_ctx_init_ext_opts dif_opts;
570 	struct iovec md_iov = {
571 		.iov_base	= md_buf,
572 		.iov_len	= num_blocks * bdev->md_len,
573 	};
574 
575 	if (md_buf == NULL) {
576 		return 0;
577 	}
578 
579 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
580 	dif_opts.dif_pi_format = bdev->dif_pi_format;
581 	rc = spdk_dif_ctx_init(&dif_ctx,
582 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
583 			       bdev->dif_is_head_of_md, bdev->dif_type,
584 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
585 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
586 	if (rc != 0) {
587 		SPDK_ERRLOG("Initialization of DIF context failed\n");
588 		return rc;
589 	}
590 
591 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
592 	if (rc != 0) {
593 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
594 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
595 	}
596 
597 	return rc;
598 }
599 
600 void
601 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
602 {
603 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
604 	int rc;
605 
606 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
607 		struct iovec *split_iov = raid_io->split.iov;
608 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
609 
610 		/*
611 		 * Non-zero offset here means that this is the completion of the first part of the
612 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
613 		 */
614 		if (raid_io->split.offset != 0) {
615 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
616 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
617 
618 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
619 				raid_io->num_blocks = raid_io->split.offset;
620 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
621 				raid_io->iovs = bdev_io->u.bdev.iovs;
622 				if (split_iov != NULL) {
623 					raid_io->iovcnt++;
624 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
625 					split_iov->iov_base = split_iov_orig->iov_base;
626 				}
627 
628 				raid_io->split.offset = 0;
629 				raid_io->base_bdev_io_submitted = 0;
630 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
631 
632 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
633 				return;
634 			}
635 		}
636 
637 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
638 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
639 		raid_io->iovs = bdev_io->u.bdev.iovs;
640 		if (split_iov != NULL) {
641 			*split_iov = *split_iov_orig;
642 		}
643 	}
644 
645 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
646 		raid_io->completion_cb(raid_io, status);
647 	} else {
648 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
649 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
650 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
651 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
652 
653 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
654 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
655 							bdev_io->u.bdev.offset_blocks);
656 			if (rc != 0) {
657 				status = SPDK_BDEV_IO_STATUS_FAILED;
658 			}
659 		}
660 		spdk_bdev_io_complete(bdev_io, status);
661 	}
662 }
663 
664 /*
665  * brief:
666  * raid_bdev_io_complete_part - signal the completion of a part of the expected
667  * base bdev IOs and complete the raid_io if this is the final expected IO.
668  * The caller should first set raid_io->base_bdev_io_remaining. This function
669  * will decrement this counter by the value of the 'completed' parameter and
670  * complete the raid_io if the counter reaches 0. The caller is free to
671  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
672  * it can represent e.g. blocks or IOs.
673  * params:
674  * raid_io - pointer to raid_bdev_io
675  * completed - the part of the raid_io that has been completed
676  * status - status of the base IO
677  * returns:
678  * true - if the raid_io is completed
679  * false - otherwise
680  */
681 bool
682 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
683 			   enum spdk_bdev_io_status status)
684 {
685 	assert(raid_io->base_bdev_io_remaining >= completed);
686 	raid_io->base_bdev_io_remaining -= completed;
687 
688 	if (status != raid_io->base_bdev_io_status_default) {
689 		raid_io->base_bdev_io_status = status;
690 	}
691 
692 	if (raid_io->base_bdev_io_remaining == 0) {
693 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
694 		return true;
695 	} else {
696 		return false;
697 	}
698 }
699 
700 /*
701  * brief:
702  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
703  * It will try to queue the IOs after storing the context to bdev wait queue logic.
704  * params:
705  * raid_io - pointer to raid_bdev_io
706  * bdev - the block device that the IO is submitted to
707  * ch - io channel
708  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
709  * returns:
710  * none
711  */
712 void
713 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
714 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
715 {
716 	raid_io->waitq_entry.bdev = bdev;
717 	raid_io->waitq_entry.cb_fn = cb_fn;
718 	raid_io->waitq_entry.cb_arg = raid_io;
719 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
720 }
721 
722 static void
723 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
724 {
725 	struct raid_bdev_io *raid_io = cb_arg;
726 
727 	spdk_bdev_free_io(bdev_io);
728 
729 	raid_bdev_io_complete_part(raid_io, 1, success ?
730 				   SPDK_BDEV_IO_STATUS_SUCCESS :
731 				   SPDK_BDEV_IO_STATUS_FAILED);
732 }
733 
734 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
735 
736 static void
737 _raid_bdev_submit_reset_request(void *_raid_io)
738 {
739 	struct raid_bdev_io *raid_io = _raid_io;
740 
741 	raid_bdev_submit_reset_request(raid_io);
742 }
743 
744 /*
745  * brief:
746  * raid_bdev_submit_reset_request function submits reset requests
747  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
748  * which case it will queue it for later submission
749  * params:
750  * raid_io
751  * returns:
752  * none
753  */
754 static void
755 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
756 {
757 	struct raid_bdev		*raid_bdev;
758 	int				ret;
759 	uint8_t				i;
760 	struct raid_base_bdev_info	*base_info;
761 	struct spdk_io_channel		*base_ch;
762 
763 	raid_bdev = raid_io->raid_bdev;
764 
765 	if (raid_io->base_bdev_io_remaining == 0) {
766 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
767 	}
768 
769 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
770 		base_info = &raid_bdev->base_bdev_info[i];
771 		base_ch = raid_io->raid_ch->base_channel[i];
772 		if (base_ch == NULL) {
773 			raid_io->base_bdev_io_submitted++;
774 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
775 			continue;
776 		}
777 		ret = spdk_bdev_reset(base_info->desc, base_ch,
778 				      raid_base_bdev_reset_complete, raid_io);
779 		if (ret == 0) {
780 			raid_io->base_bdev_io_submitted++;
781 		} else if (ret == -ENOMEM) {
782 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
783 						base_ch, _raid_bdev_submit_reset_request);
784 			return;
785 		} else {
786 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
787 			assert(false);
788 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
789 			return;
790 		}
791 	}
792 }
793 
794 static void
795 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
796 {
797 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
798 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
799 	int i;
800 
801 	assert(split_offset != 0);
802 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
803 	raid_io->split.offset = split_offset;
804 
805 	raid_io->offset_blocks += split_offset;
806 	raid_io->num_blocks -= split_offset;
807 	if (raid_io->md_buf != NULL) {
808 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
809 	}
810 
811 	for (i = 0; i < raid_io->iovcnt; i++) {
812 		struct iovec *iov = &raid_io->iovs[i];
813 
814 		if (iov_offset < iov->iov_len) {
815 			if (iov_offset == 0) {
816 				raid_io->split.iov = NULL;
817 			} else {
818 				raid_io->split.iov = iov;
819 				raid_io->split.iov_copy = *iov;
820 				iov->iov_base += iov_offset;
821 				iov->iov_len -= iov_offset;
822 			}
823 			raid_io->iovs += i;
824 			raid_io->iovcnt -= i;
825 			break;
826 		}
827 
828 		iov_offset -= iov->iov_len;
829 	}
830 }
831 
832 static void
833 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
834 {
835 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
836 
837 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
838 		uint64_t offset_begin = raid_io->offset_blocks;
839 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
840 
841 		if (offset_end > raid_ch->process.offset) {
842 			if (offset_begin < raid_ch->process.offset) {
843 				/*
844 				 * If the I/O spans both the processed and unprocessed ranges,
845 				 * split it and first handle the unprocessed part. After it
846 				 * completes, the rest will be handled.
847 				 * This situation occurs when the process thread is not active
848 				 * or is waiting for the process window range to be locked
849 				 * (quiesced). When a window is being processed, such I/Os will be
850 				 * deferred by the bdev layer until the window is unlocked.
851 				 */
852 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
853 					      raid_ch->process.offset, offset_begin, offset_end);
854 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
855 			}
856 		} else {
857 			/* Use the child channel, which corresponds to the already processed range */
858 			raid_io->raid_ch = raid_ch->process.ch_processed;
859 		}
860 	}
861 
862 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
863 }
864 
865 /*
866  * brief:
867  * Callback function to spdk_bdev_io_get_buf.
868  * params:
869  * ch - pointer to raid bdev io channel
870  * bdev_io - pointer to parent bdev_io on raid bdev device
871  * success - True if buffer is allocated or false otherwise.
872  * returns:
873  * none
874  */
875 static void
876 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
877 		     bool success)
878 {
879 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
880 
881 	if (!success) {
882 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
883 		return;
884 	}
885 
886 	raid_bdev_submit_rw_request(raid_io);
887 }
888 
889 void
890 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
891 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
892 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
893 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
894 {
895 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
896 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
897 
898 	raid_io->type = type;
899 	raid_io->offset_blocks = offset_blocks;
900 	raid_io->num_blocks = num_blocks;
901 	raid_io->iovs = iovs;
902 	raid_io->iovcnt = iovcnt;
903 	raid_io->memory_domain = memory_domain;
904 	raid_io->memory_domain_ctx = memory_domain_ctx;
905 	raid_io->md_buf = md_buf;
906 
907 	raid_io->raid_bdev = raid_bdev;
908 	raid_io->raid_ch = raid_ch;
909 	raid_io->base_bdev_io_remaining = 0;
910 	raid_io->base_bdev_io_submitted = 0;
911 	raid_io->completion_cb = NULL;
912 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
913 
914 	raid_bdev_io_set_default_status(raid_io, SPDK_BDEV_IO_STATUS_SUCCESS);
915 }
916 
917 /*
918  * brief:
919  * raid_bdev_submit_request function is the submit_request function pointer of
920  * raid bdev function table. This is used to submit the io on raid_bdev to below
921  * layers.
922  * params:
923  * ch - pointer to raid bdev io channel
924  * bdev_io - pointer to parent bdev_io on raid bdev device
925  * returns:
926  * none
927  */
928 static void
929 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
930 {
931 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
932 
933 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
934 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
935 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
936 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
937 
938 	switch (bdev_io->type) {
939 	case SPDK_BDEV_IO_TYPE_READ:
940 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
941 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
942 		break;
943 	case SPDK_BDEV_IO_TYPE_WRITE:
944 		raid_bdev_submit_rw_request(raid_io);
945 		break;
946 
947 	case SPDK_BDEV_IO_TYPE_RESET:
948 		raid_bdev_submit_reset_request(raid_io);
949 		break;
950 
951 	case SPDK_BDEV_IO_TYPE_FLUSH:
952 	case SPDK_BDEV_IO_TYPE_UNMAP:
953 		if (raid_io->raid_bdev->process != NULL) {
954 			/* TODO: rebuild support */
955 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
956 			return;
957 		}
958 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
959 		break;
960 
961 	default:
962 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
963 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
964 		break;
965 	}
966 }
967 
968 /*
969  * brief:
970  * _raid_bdev_io_type_supported checks whether io_type is supported in
971  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
972  * doesn't support, the raid device doesn't supports.
973  *
974  * params:
975  * raid_bdev - pointer to raid bdev context
976  * io_type - io type
977  * returns:
978  * true - io_type is supported
979  * false - io_type is not supported
980  */
981 inline static bool
982 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
983 {
984 	struct raid_base_bdev_info *base_info;
985 
986 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
987 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
988 		if (raid_bdev->module->submit_null_payload_request == NULL) {
989 			return false;
990 		}
991 	}
992 
993 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
994 		if (base_info->desc == NULL) {
995 			continue;
996 		}
997 
998 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
999 			return false;
1000 		}
1001 	}
1002 
1003 	return true;
1004 }
1005 
1006 /*
1007  * brief:
1008  * raid_bdev_io_type_supported is the io_supported function for bdev function
1009  * table which returns whether the particular io type is supported or not by
1010  * raid bdev module
1011  * params:
1012  * ctx - pointer to raid bdev context
1013  * type - io type
1014  * returns:
1015  * true - io_type is supported
1016  * false - io_type is not supported
1017  */
1018 static bool
1019 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1020 {
1021 	switch (io_type) {
1022 	case SPDK_BDEV_IO_TYPE_READ:
1023 	case SPDK_BDEV_IO_TYPE_WRITE:
1024 		return true;
1025 
1026 	case SPDK_BDEV_IO_TYPE_FLUSH:
1027 	case SPDK_BDEV_IO_TYPE_RESET:
1028 	case SPDK_BDEV_IO_TYPE_UNMAP:
1029 		return _raid_bdev_io_type_supported(ctx, io_type);
1030 
1031 	default:
1032 		return false;
1033 	}
1034 
1035 	return false;
1036 }
1037 
1038 /*
1039  * brief:
1040  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1041  * raid bdev. This is used to return the io channel for this raid bdev
1042  * params:
1043  * ctxt - pointer to raid_bdev
1044  * returns:
1045  * pointer to io channel for raid bdev
1046  */
1047 static struct spdk_io_channel *
1048 raid_bdev_get_io_channel(void *ctxt)
1049 {
1050 	struct raid_bdev *raid_bdev = ctxt;
1051 
1052 	return spdk_get_io_channel(raid_bdev);
1053 }
1054 
1055 void
1056 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1057 {
1058 	struct raid_base_bdev_info *base_info;
1059 
1060 	assert(raid_bdev != NULL);
1061 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1062 
1063 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1064 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1065 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1066 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1067 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1068 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1069 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1070 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1071 				     raid_bdev->num_base_bdevs_operational);
1072 	if (raid_bdev->process) {
1073 		struct raid_bdev_process *process = raid_bdev->process;
1074 		uint64_t offset = process->window_offset;
1075 
1076 		spdk_json_write_named_object_begin(w, "process");
1077 		spdk_json_write_name(w, "type");
1078 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1079 		spdk_json_write_named_string(w, "target", process->target->name);
1080 		spdk_json_write_named_object_begin(w, "progress");
1081 		spdk_json_write_named_uint64(w, "blocks", offset);
1082 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1083 		spdk_json_write_object_end(w);
1084 		spdk_json_write_object_end(w);
1085 	}
1086 	spdk_json_write_name(w, "base_bdevs_list");
1087 	spdk_json_write_array_begin(w);
1088 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1089 		spdk_json_write_object_begin(w);
1090 		spdk_json_write_name(w, "name");
1091 		if (base_info->name) {
1092 			spdk_json_write_string(w, base_info->name);
1093 		} else {
1094 			spdk_json_write_null(w);
1095 		}
1096 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1097 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1098 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1099 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1100 		spdk_json_write_object_end(w);
1101 	}
1102 	spdk_json_write_array_end(w);
1103 }
1104 
1105 /*
1106  * brief:
1107  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1108  * params:
1109  * ctx - pointer to raid_bdev
1110  * w - pointer to json context
1111  * returns:
1112  * 0 - success
1113  * non zero - failure
1114  */
1115 static int
1116 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1117 {
1118 	struct raid_bdev *raid_bdev = ctx;
1119 
1120 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1121 
1122 	/* Dump the raid bdev configuration related information */
1123 	spdk_json_write_named_object_begin(w, "raid");
1124 	raid_bdev_write_info_json(raid_bdev, w);
1125 	spdk_json_write_object_end(w);
1126 
1127 	return 0;
1128 }
1129 
1130 /*
1131  * brief:
1132  * raid_bdev_write_config_json is the function table pointer for raid bdev
1133  * params:
1134  * bdev - pointer to spdk_bdev
1135  * w - pointer to json context
1136  * returns:
1137  * none
1138  */
1139 static void
1140 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1141 {
1142 	struct raid_bdev *raid_bdev = bdev->ctxt;
1143 	struct raid_base_bdev_info *base_info;
1144 
1145 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1146 
1147 	if (raid_bdev->superblock_enabled) {
1148 		/* raid bdev configuration is stored in the superblock */
1149 		return;
1150 	}
1151 
1152 	spdk_json_write_object_begin(w);
1153 
1154 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1155 
1156 	spdk_json_write_named_object_begin(w, "params");
1157 	spdk_json_write_named_string(w, "name", bdev->name);
1158 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1159 	if (raid_bdev->strip_size_kb != 0) {
1160 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1161 	}
1162 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1163 
1164 	spdk_json_write_named_array_begin(w, "base_bdevs");
1165 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1166 		if (base_info->name) {
1167 			spdk_json_write_string(w, base_info->name);
1168 		} else {
1169 			char str[32];
1170 
1171 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1172 			spdk_json_write_string(w, str);
1173 		}
1174 	}
1175 	spdk_json_write_array_end(w);
1176 	spdk_json_write_object_end(w);
1177 
1178 	spdk_json_write_object_end(w);
1179 }
1180 
1181 static int
1182 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1183 {
1184 	struct raid_bdev *raid_bdev = ctx;
1185 	struct raid_base_bdev_info *base_info;
1186 	int domains_count = 0, rc = 0;
1187 
1188 	if (raid_bdev->module->memory_domains_supported == false) {
1189 		return 0;
1190 	}
1191 
1192 	/* First loop to get the number of memory domains */
1193 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1194 		if (base_info->is_configured == false) {
1195 			continue;
1196 		}
1197 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1198 		if (rc < 0) {
1199 			return rc;
1200 		}
1201 		domains_count += rc;
1202 	}
1203 
1204 	if (!domains || array_size < domains_count) {
1205 		return domains_count;
1206 	}
1207 
1208 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1209 		if (base_info->is_configured == false) {
1210 			continue;
1211 		}
1212 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1213 		if (rc < 0) {
1214 			return rc;
1215 		}
1216 		domains += rc;
1217 		array_size -= rc;
1218 	}
1219 
1220 	return domains_count;
1221 }
1222 
1223 /* g_raid_bdev_fn_table is the function table for raid bdev */
1224 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1225 	.destruct		= raid_bdev_destruct,
1226 	.submit_request		= raid_bdev_submit_request,
1227 	.io_type_supported	= raid_bdev_io_type_supported,
1228 	.get_io_channel		= raid_bdev_get_io_channel,
1229 	.dump_info_json		= raid_bdev_dump_info_json,
1230 	.write_config_json	= raid_bdev_write_config_json,
1231 	.get_memory_domains	= raid_bdev_get_memory_domains,
1232 };
1233 
1234 struct raid_bdev *
1235 raid_bdev_find_by_name(const char *name)
1236 {
1237 	struct raid_bdev *raid_bdev;
1238 
1239 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1240 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1241 			return raid_bdev;
1242 		}
1243 	}
1244 
1245 	return NULL;
1246 }
1247 
1248 static struct raid_bdev *
1249 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1250 {
1251 	struct raid_bdev *raid_bdev;
1252 
1253 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1254 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1255 			return raid_bdev;
1256 		}
1257 	}
1258 
1259 	return NULL;
1260 }
1261 
1262 static struct {
1263 	const char *name;
1264 	enum raid_level value;
1265 } g_raid_level_names[] = {
1266 	{ "raid0", RAID0 },
1267 	{ "0", RAID0 },
1268 	{ "raid1", RAID1 },
1269 	{ "1", RAID1 },
1270 	{ "raid5f", RAID5F },
1271 	{ "5f", RAID5F },
1272 	{ "concat", CONCAT },
1273 	{ }
1274 };
1275 
1276 const char *g_raid_state_names[] = {
1277 	[RAID_BDEV_STATE_ONLINE]	= "online",
1278 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1279 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1280 	[RAID_BDEV_STATE_MAX]		= NULL
1281 };
1282 
1283 static const char *g_raid_process_type_names[] = {
1284 	[RAID_PROCESS_NONE]	= "none",
1285 	[RAID_PROCESS_REBUILD]	= "rebuild",
1286 	[RAID_PROCESS_MAX]	= NULL
1287 };
1288 
1289 /* We have to use the typedef in the function declaration to appease astyle. */
1290 typedef enum raid_level raid_level_t;
1291 typedef enum raid_bdev_state raid_bdev_state_t;
1292 
1293 raid_level_t
1294 raid_bdev_str_to_level(const char *str)
1295 {
1296 	unsigned int i;
1297 
1298 	assert(str != NULL);
1299 
1300 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1301 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1302 			return g_raid_level_names[i].value;
1303 		}
1304 	}
1305 
1306 	return INVALID_RAID_LEVEL;
1307 }
1308 
1309 const char *
1310 raid_bdev_level_to_str(enum raid_level level)
1311 {
1312 	unsigned int i;
1313 
1314 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1315 		if (g_raid_level_names[i].value == level) {
1316 			return g_raid_level_names[i].name;
1317 		}
1318 	}
1319 
1320 	return "";
1321 }
1322 
1323 raid_bdev_state_t
1324 raid_bdev_str_to_state(const char *str)
1325 {
1326 	unsigned int i;
1327 
1328 	assert(str != NULL);
1329 
1330 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1331 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1332 			break;
1333 		}
1334 	}
1335 
1336 	return i;
1337 }
1338 
1339 const char *
1340 raid_bdev_state_to_str(enum raid_bdev_state state)
1341 {
1342 	if (state >= RAID_BDEV_STATE_MAX) {
1343 		return "";
1344 	}
1345 
1346 	return g_raid_state_names[state];
1347 }
1348 
1349 const char *
1350 raid_bdev_process_to_str(enum raid_process_type value)
1351 {
1352 	if (value >= RAID_PROCESS_MAX) {
1353 		return "";
1354 	}
1355 
1356 	return g_raid_process_type_names[value];
1357 }
1358 
1359 /*
1360  * brief:
1361  * raid_bdev_fini_start is called when bdev layer is starting the
1362  * shutdown process
1363  * params:
1364  * none
1365  * returns:
1366  * none
1367  */
1368 static void
1369 raid_bdev_fini_start(void)
1370 {
1371 	struct raid_bdev *raid_bdev;
1372 	struct raid_base_bdev_info *base_info;
1373 
1374 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1375 
1376 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1377 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1378 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1379 				raid_bdev_free_base_bdev_resource(base_info);
1380 			}
1381 		}
1382 	}
1383 
1384 	g_shutdown_started = true;
1385 }
1386 
1387 /*
1388  * brief:
1389  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1390  * params:
1391  * none
1392  * returns:
1393  * none
1394  */
1395 static void
1396 raid_bdev_exit(void)
1397 {
1398 	struct raid_bdev *raid_bdev, *tmp;
1399 
1400 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1401 
1402 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1403 		raid_bdev_cleanup_and_free(raid_bdev);
1404 	}
1405 }
1406 
1407 static void
1408 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1409 {
1410 	spdk_json_write_object_begin(w);
1411 
1412 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1413 
1414 	spdk_json_write_named_object_begin(w, "params");
1415 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1416 	spdk_json_write_named_uint32(w, "process_max_bandwidth_mb_sec",
1417 				     g_opts.process_max_bandwidth_mb_sec);
1418 	spdk_json_write_object_end(w);
1419 
1420 	spdk_json_write_object_end(w);
1421 }
1422 
1423 static int
1424 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1425 {
1426 	raid_bdev_opts_config_json(w);
1427 
1428 	return 0;
1429 }
1430 
1431 /*
1432  * brief:
1433  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1434  * module
1435  * params:
1436  * none
1437  * returns:
1438  * size of spdk_bdev_io context for raid
1439  */
1440 static int
1441 raid_bdev_get_ctx_size(void)
1442 {
1443 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1444 	return sizeof(struct raid_bdev_io);
1445 }
1446 
1447 static struct spdk_bdev_module g_raid_if = {
1448 	.name = "raid",
1449 	.module_init = raid_bdev_init,
1450 	.fini_start = raid_bdev_fini_start,
1451 	.module_fini = raid_bdev_exit,
1452 	.config_json = raid_bdev_config_json,
1453 	.get_ctx_size = raid_bdev_get_ctx_size,
1454 	.examine_disk = raid_bdev_examine,
1455 	.async_init = false,
1456 	.async_fini = false,
1457 };
1458 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1459 
1460 /*
1461  * brief:
1462  * raid_bdev_init is the initialization function for raid bdev module
1463  * params:
1464  * none
1465  * returns:
1466  * 0 - success
1467  * non zero - failure
1468  */
1469 static int
1470 raid_bdev_init(void)
1471 {
1472 	return 0;
1473 }
1474 
1475 static int
1476 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1477 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1478 		  struct raid_bdev **raid_bdev_out)
1479 {
1480 	struct raid_bdev *raid_bdev;
1481 	struct spdk_bdev *raid_bdev_gen;
1482 	struct raid_bdev_module *module;
1483 	struct raid_base_bdev_info *base_info;
1484 	uint8_t min_operational;
1485 
1486 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1487 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1488 		return -EINVAL;
1489 	}
1490 
1491 	if (raid_bdev_find_by_name(name) != NULL) {
1492 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1493 		return -EEXIST;
1494 	}
1495 
1496 	if (level == RAID1) {
1497 		if (strip_size != 0) {
1498 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1499 			return -EINVAL;
1500 		}
1501 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1502 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1503 		return -EINVAL;
1504 	}
1505 
1506 	module = raid_bdev_module_find(level);
1507 	if (module == NULL) {
1508 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1509 		return -EINVAL;
1510 	}
1511 
1512 	assert(module->base_bdevs_min != 0);
1513 	if (num_base_bdevs < module->base_bdevs_min) {
1514 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1515 			    module->base_bdevs_min,
1516 			    raid_bdev_level_to_str(level));
1517 		return -EINVAL;
1518 	}
1519 
1520 	switch (module->base_bdevs_constraint.type) {
1521 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1522 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1523 		break;
1524 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1525 		min_operational = module->base_bdevs_constraint.value;
1526 		break;
1527 	case CONSTRAINT_UNSET:
1528 		if (module->base_bdevs_constraint.value != 0) {
1529 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1530 				    (uint8_t)module->base_bdevs_constraint.value, name);
1531 			return -EINVAL;
1532 		}
1533 		min_operational = num_base_bdevs;
1534 		break;
1535 	default:
1536 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1537 			    (uint8_t)module->base_bdevs_constraint.type,
1538 			    raid_bdev_level_to_str(module->level));
1539 		return -EINVAL;
1540 	};
1541 
1542 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1543 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1544 			    raid_bdev_level_to_str(module->level));
1545 		return -EINVAL;
1546 	}
1547 
1548 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1549 	if (!raid_bdev) {
1550 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1551 		return -ENOMEM;
1552 	}
1553 
1554 	raid_bdev->module = module;
1555 	raid_bdev->num_base_bdevs = num_base_bdevs;
1556 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1557 					   sizeof(struct raid_base_bdev_info));
1558 	if (!raid_bdev->base_bdev_info) {
1559 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1560 		raid_bdev_free(raid_bdev);
1561 		return -ENOMEM;
1562 	}
1563 
1564 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1565 		base_info->raid_bdev = raid_bdev;
1566 	}
1567 
1568 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1569 	 * internally and set later.
1570 	 */
1571 	raid_bdev->strip_size = 0;
1572 	raid_bdev->strip_size_kb = strip_size;
1573 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1574 	raid_bdev->level = level;
1575 	raid_bdev->min_base_bdevs_operational = min_operational;
1576 	raid_bdev->superblock_enabled = superblock_enabled;
1577 
1578 	raid_bdev_gen = &raid_bdev->bdev;
1579 
1580 	raid_bdev_gen->name = strdup(name);
1581 	if (!raid_bdev_gen->name) {
1582 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1583 		raid_bdev_free(raid_bdev);
1584 		return -ENOMEM;
1585 	}
1586 
1587 	raid_bdev_gen->product_name = "Raid Volume";
1588 	raid_bdev_gen->ctxt = raid_bdev;
1589 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1590 	raid_bdev_gen->module = &g_raid_if;
1591 	raid_bdev_gen->write_cache = 0;
1592 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1593 
1594 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1595 
1596 	*raid_bdev_out = raid_bdev;
1597 
1598 	return 0;
1599 }
1600 
1601 /*
1602  * brief:
1603  * raid_bdev_create allocates raid bdev based on passed configuration
1604  * params:
1605  * name - name for raid bdev
1606  * strip_size - strip size in KB
1607  * num_base_bdevs - number of base bdevs
1608  * level - raid level
1609  * superblock_enabled - true if raid should have superblock
1610  * uuid - uuid to set for the bdev
1611  * raid_bdev_out - the created raid bdev
1612  * returns:
1613  * 0 - success
1614  * non zero - failure
1615  */
1616 int
1617 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1618 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1619 		 struct raid_bdev **raid_bdev_out)
1620 {
1621 	struct raid_bdev *raid_bdev;
1622 	int rc;
1623 
1624 	assert(uuid != NULL);
1625 
1626 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1627 			       &raid_bdev);
1628 	if (rc != 0) {
1629 		return rc;
1630 	}
1631 
1632 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1633 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1634 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1635 	}
1636 
1637 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1638 
1639 	*raid_bdev_out = raid_bdev;
1640 
1641 	return 0;
1642 }
1643 
1644 static void
1645 _raid_bdev_unregistering_cont(void *ctx)
1646 {
1647 	struct raid_bdev *raid_bdev = ctx;
1648 
1649 	spdk_bdev_close(raid_bdev->self_desc);
1650 	raid_bdev->self_desc = NULL;
1651 }
1652 
1653 static void
1654 raid_bdev_unregistering_cont(void *ctx)
1655 {
1656 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1657 }
1658 
1659 static int
1660 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1661 {
1662 	struct raid_process_finish_action *finish_action;
1663 
1664 	assert(spdk_get_thread() == process->thread);
1665 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1666 
1667 	finish_action = calloc(1, sizeof(*finish_action));
1668 	if (finish_action == NULL) {
1669 		return -ENOMEM;
1670 	}
1671 
1672 	finish_action->cb = cb;
1673 	finish_action->cb_ctx = cb_ctx;
1674 
1675 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1676 
1677 	return 0;
1678 }
1679 
1680 static void
1681 raid_bdev_unregistering_stop_process(void *ctx)
1682 {
1683 	struct raid_bdev_process *process = ctx;
1684 	struct raid_bdev *raid_bdev = process->raid_bdev;
1685 	int rc;
1686 
1687 	process->state = RAID_PROCESS_STATE_STOPPING;
1688 	if (process->status == 0) {
1689 		process->status = -ECANCELED;
1690 	}
1691 
1692 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1693 	if (rc != 0) {
1694 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1695 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1696 	}
1697 }
1698 
1699 static void
1700 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1701 {
1702 	struct raid_bdev *raid_bdev = event_ctx;
1703 
1704 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1705 		if (raid_bdev->process != NULL) {
1706 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1707 					     raid_bdev->process);
1708 		} else {
1709 			raid_bdev_unregistering_cont(raid_bdev);
1710 		}
1711 	}
1712 }
1713 
1714 static void
1715 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1716 {
1717 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1718 	int rc;
1719 
1720 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1721 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1722 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1723 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1724 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1725 				sizeof(struct raid_bdev_io_channel),
1726 				raid_bdev_gen->name);
1727 	rc = spdk_bdev_register(raid_bdev_gen);
1728 	if (rc != 0) {
1729 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1730 			    raid_bdev_gen->name, spdk_strerror(-rc));
1731 		goto out;
1732 	}
1733 
1734 	/*
1735 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1736 	 * first. The process may still need to unquiesce a range but it will fail because the
1737 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1738 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1739 	 * so this is the only way currently to do this correctly.
1740 	 * TODO: try to handle this correctly in bdev layer instead.
1741 	 */
1742 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1743 				&raid_bdev->self_desc);
1744 	if (rc != 0) {
1745 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1746 			    raid_bdev_gen->name, spdk_strerror(-rc));
1747 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1748 		goto out;
1749 	}
1750 
1751 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1752 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1753 		      raid_bdev_gen->name, raid_bdev);
1754 out:
1755 	if (rc != 0) {
1756 		if (raid_bdev->module->stop != NULL) {
1757 			raid_bdev->module->stop(raid_bdev);
1758 		}
1759 		spdk_io_device_unregister(raid_bdev, NULL);
1760 		raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1761 	}
1762 
1763 	if (raid_bdev->configure_cb != NULL) {
1764 		raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, rc);
1765 		raid_bdev->configure_cb = NULL;
1766 	}
1767 }
1768 
1769 static void
1770 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1771 {
1772 	if (status == 0) {
1773 		raid_bdev_configure_cont(raid_bdev);
1774 	} else {
1775 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1776 			    raid_bdev->bdev.name, spdk_strerror(-status));
1777 		if (raid_bdev->module->stop != NULL) {
1778 			raid_bdev->module->stop(raid_bdev);
1779 		}
1780 		if (raid_bdev->configure_cb != NULL) {
1781 			raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, status);
1782 			raid_bdev->configure_cb = NULL;
1783 		}
1784 	}
1785 }
1786 
1787 /*
1788  * brief:
1789  * If raid bdev config is complete, then only register the raid bdev to
1790  * bdev layer and remove this raid bdev from configuring list and
1791  * insert the raid bdev to configured list
1792  * params:
1793  * raid_bdev - pointer to raid bdev
1794  * returns:
1795  * 0 - success
1796  * non zero - failure
1797  */
1798 static int
1799 raid_bdev_configure(struct raid_bdev *raid_bdev, raid_bdev_configure_cb cb, void *cb_ctx)
1800 {
1801 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1802 	int rc;
1803 
1804 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1805 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1806 	assert(raid_bdev->bdev.blocklen > 0);
1807 
1808 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1809 	 * internal use.
1810 	 */
1811 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1812 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1813 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1814 		return -EINVAL;
1815 	}
1816 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1817 
1818 	rc = raid_bdev->module->start(raid_bdev);
1819 	if (rc != 0) {
1820 		SPDK_ERRLOG("raid module startup callback failed\n");
1821 		return rc;
1822 	}
1823 
1824 	assert(raid_bdev->configure_cb == NULL);
1825 	raid_bdev->configure_cb = cb;
1826 	raid_bdev->configure_cb_ctx = cb_ctx;
1827 
1828 	if (raid_bdev->superblock_enabled) {
1829 		if (raid_bdev->sb == NULL) {
1830 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1831 			if (rc == 0) {
1832 				raid_bdev_init_superblock(raid_bdev);
1833 			}
1834 		} else {
1835 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1836 			if (raid_bdev->sb->block_size != data_block_size) {
1837 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1838 				rc = -EINVAL;
1839 			}
1840 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1841 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1842 				rc = -EINVAL;
1843 			}
1844 		}
1845 
1846 		if (rc != 0) {
1847 			raid_bdev->configure_cb = NULL;
1848 			if (raid_bdev->module->stop != NULL) {
1849 				raid_bdev->module->stop(raid_bdev);
1850 			}
1851 			return rc;
1852 		}
1853 
1854 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1855 	} else {
1856 		raid_bdev_configure_cont(raid_bdev);
1857 	}
1858 
1859 	return 0;
1860 }
1861 
1862 /*
1863  * brief:
1864  * If raid bdev is online and registered, change the bdev state to
1865  * configuring and unregister this raid device. Queue this raid device
1866  * in configuring list
1867  * params:
1868  * raid_bdev - pointer to raid bdev
1869  * cb_fn - callback function
1870  * cb_arg - argument to callback function
1871  * returns:
1872  * none
1873  */
1874 static void
1875 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1876 		      void *cb_arg)
1877 {
1878 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1879 		if (cb_fn) {
1880 			cb_fn(cb_arg, 0);
1881 		}
1882 		return;
1883 	}
1884 
1885 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1886 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1887 
1888 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1889 }
1890 
1891 /*
1892  * brief:
1893  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1894  * params:
1895  * base_bdev - pointer to base bdev
1896  * returns:
1897  * base bdev info if found, otherwise NULL.
1898  */
1899 static struct raid_base_bdev_info *
1900 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1901 {
1902 	struct raid_bdev *raid_bdev;
1903 	struct raid_base_bdev_info *base_info;
1904 
1905 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1906 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1907 			if (base_info->desc != NULL &&
1908 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1909 				return base_info;
1910 			}
1911 		}
1912 	}
1913 
1914 	return NULL;
1915 }
1916 
1917 static void
1918 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1919 {
1920 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1921 
1922 	assert(base_info->remove_scheduled);
1923 	base_info->remove_scheduled = false;
1924 
1925 	if (status == 0) {
1926 		raid_bdev->num_base_bdevs_operational--;
1927 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1928 			/* There is not enough base bdevs to keep the raid bdev operational. */
1929 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1930 			return;
1931 		}
1932 	}
1933 
1934 	if (base_info->remove_cb != NULL) {
1935 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1936 	}
1937 }
1938 
1939 static void
1940 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1941 {
1942 	struct raid_base_bdev_info *base_info = ctx;
1943 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1944 
1945 	if (status != 0) {
1946 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1947 			    raid_bdev->bdev.name, spdk_strerror(-status));
1948 	}
1949 
1950 	raid_bdev_remove_base_bdev_done(base_info, status);
1951 }
1952 
1953 static void
1954 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1955 {
1956 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1957 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1958 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1959 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1960 
1961 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1962 
1963 	if (raid_ch->base_channel[idx] != NULL) {
1964 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1965 		raid_ch->base_channel[idx] = NULL;
1966 	}
1967 
1968 	if (raid_ch->process.ch_processed != NULL) {
1969 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1970 	}
1971 
1972 	spdk_for_each_channel_continue(i, 0);
1973 }
1974 
1975 static void
1976 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1977 {
1978 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1979 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1980 
1981 	raid_bdev_free_base_bdev_resource(base_info);
1982 
1983 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1984 			    base_info);
1985 }
1986 
1987 static void
1988 raid_bdev_remove_base_bdev_cont(struct raid_base_bdev_info *base_info)
1989 {
1990 	raid_bdev_deconfigure_base_bdev(base_info);
1991 
1992 	spdk_for_each_channel(base_info->raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1993 			      raid_bdev_channels_remove_base_bdev_done);
1994 }
1995 
1996 static void
1997 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1998 {
1999 	struct raid_base_bdev_info *base_info = ctx;
2000 
2001 	if (status != 0) {
2002 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
2003 			    raid_bdev->bdev.name, spdk_strerror(-status));
2004 		raid_bdev_remove_base_bdev_done(base_info, status);
2005 		return;
2006 	}
2007 
2008 	raid_bdev_remove_base_bdev_cont(base_info);
2009 }
2010 
2011 static void
2012 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
2013 {
2014 	struct raid_base_bdev_info *base_info = ctx;
2015 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2016 
2017 	if (status != 0) {
2018 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2019 			    raid_bdev->bdev.name, spdk_strerror(-status));
2020 		raid_bdev_remove_base_bdev_done(base_info, status);
2021 		return;
2022 	}
2023 
2024 	if (raid_bdev->sb) {
2025 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2026 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
2027 		uint8_t i;
2028 
2029 		for (i = 0; i < sb->base_bdevs_size; i++) {
2030 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2031 
2032 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
2033 			    sb_base_bdev->slot == slot) {
2034 				if (base_info->is_failed) {
2035 					sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
2036 				} else {
2037 					sb_base_bdev->state = RAID_SB_BASE_BDEV_MISSING;
2038 				}
2039 
2040 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
2041 				return;
2042 			}
2043 		}
2044 	}
2045 
2046 	raid_bdev_remove_base_bdev_cont(base_info);
2047 }
2048 
2049 static int
2050 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2051 {
2052 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2053 
2054 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2055 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2056 }
2057 
2058 struct raid_bdev_process_base_bdev_remove_ctx {
2059 	struct raid_bdev_process *process;
2060 	struct raid_base_bdev_info *base_info;
2061 	uint8_t num_base_bdevs_operational;
2062 };
2063 
2064 static void
2065 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2066 {
2067 	struct raid_base_bdev_info *base_info = ctx;
2068 	int ret;
2069 
2070 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2071 	if (ret != 0) {
2072 		raid_bdev_remove_base_bdev_done(base_info, ret);
2073 	}
2074 }
2075 
2076 static void
2077 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2078 {
2079 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2080 	struct raid_base_bdev_info *base_info = ctx->base_info;
2081 
2082 	free(ctx);
2083 
2084 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2085 			     base_info);
2086 }
2087 
2088 static void
2089 _raid_bdev_process_base_bdev_remove(void *_ctx)
2090 {
2091 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2092 	struct raid_bdev_process *process = ctx->process;
2093 	int ret;
2094 
2095 	if (ctx->base_info != process->target &&
2096 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2097 		/* process doesn't need to be stopped */
2098 		raid_bdev_process_base_bdev_remove_cont(ctx);
2099 		return;
2100 	}
2101 
2102 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2103 	       process->state < RAID_PROCESS_STATE_STOPPED);
2104 
2105 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2106 	if (ret != 0) {
2107 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2108 		free(ctx);
2109 		return;
2110 	}
2111 
2112 	process->state = RAID_PROCESS_STATE_STOPPING;
2113 
2114 	if (process->status == 0) {
2115 		process->status = -ENODEV;
2116 	}
2117 }
2118 
2119 static int
2120 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2121 				   struct raid_base_bdev_info *base_info)
2122 {
2123 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2124 
2125 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2126 
2127 	ctx = calloc(1, sizeof(*ctx));
2128 	if (ctx == NULL) {
2129 		return -ENOMEM;
2130 	}
2131 
2132 	/*
2133 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2134 	 * because the process thread should not access raid_bdev's properties. Particularly,
2135 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2136 	 * will still be valid until the process is fully stopped.
2137 	 */
2138 	ctx->base_info = base_info;
2139 	ctx->process = process;
2140 	/*
2141 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2142 	 * after the removal and more than one base bdev may be removed at the same time
2143 	 */
2144 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2145 		if (base_info->is_configured && !base_info->remove_scheduled) {
2146 			ctx->num_base_bdevs_operational++;
2147 		}
2148 	}
2149 
2150 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2151 
2152 	return 0;
2153 }
2154 
2155 static int
2156 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2157 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2158 {
2159 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2160 	int ret = 0;
2161 
2162 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2163 
2164 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2165 
2166 	if (base_info->remove_scheduled || !base_info->is_configured) {
2167 		return -ENODEV;
2168 	}
2169 
2170 	assert(base_info->desc);
2171 	base_info->remove_scheduled = true;
2172 
2173 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2174 		/*
2175 		 * As raid bdev is not registered yet or already unregistered,
2176 		 * so cleanup should be done here itself.
2177 		 *
2178 		 * Removing a base bdev at this stage does not change the number of operational
2179 		 * base bdevs, only the number of discovered base bdevs.
2180 		 */
2181 		raid_bdev_free_base_bdev_resource(base_info);
2182 		base_info->remove_scheduled = false;
2183 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2184 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2185 			/* There is no base bdev for this raid, so free the raid device. */
2186 			raid_bdev_cleanup_and_free(raid_bdev);
2187 		}
2188 		if (cb_fn != NULL) {
2189 			cb_fn(cb_ctx, 0);
2190 		}
2191 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2192 		/* This raid bdev does not tolerate removing a base bdev. */
2193 		raid_bdev->num_base_bdevs_operational--;
2194 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2195 	} else {
2196 		base_info->remove_cb = cb_fn;
2197 		base_info->remove_cb_ctx = cb_ctx;
2198 
2199 		if (raid_bdev->process != NULL) {
2200 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2201 		} else {
2202 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2203 		}
2204 
2205 		if (ret != 0) {
2206 			base_info->remove_scheduled = false;
2207 		}
2208 	}
2209 
2210 	return ret;
2211 }
2212 
2213 /*
2214  * brief:
2215  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2216  * is removed. This function checks if this base bdev is part of any raid bdev
2217  * or not. If yes, it takes necessary action on that particular raid bdev.
2218  * params:
2219  * base_bdev - pointer to base bdev which got removed
2220  * cb_fn - callback function
2221  * cb_arg - argument to callback function
2222  * returns:
2223  * 0 - success
2224  * non zero - failure
2225  */
2226 int
2227 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2228 {
2229 	struct raid_base_bdev_info *base_info;
2230 
2231 	/* Find the raid_bdev which has claimed this base_bdev */
2232 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2233 	if (!base_info) {
2234 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2235 		return -ENODEV;
2236 	}
2237 
2238 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2239 }
2240 
2241 static void
2242 raid_bdev_fail_base_remove_cb(void *ctx, int status)
2243 {
2244 	struct raid_base_bdev_info *base_info = ctx;
2245 
2246 	if (status != 0) {
2247 		SPDK_WARNLOG("Failed to remove base bdev %s\n", base_info->name);
2248 		base_info->is_failed = false;
2249 	}
2250 }
2251 
2252 static void
2253 _raid_bdev_fail_base_bdev(void *ctx)
2254 {
2255 	struct raid_base_bdev_info *base_info = ctx;
2256 	int rc;
2257 
2258 	if (base_info->is_failed) {
2259 		return;
2260 	}
2261 	base_info->is_failed = true;
2262 
2263 	SPDK_NOTICELOG("Failing base bdev in slot %d ('%s') of raid bdev '%s'\n",
2264 		       raid_bdev_base_bdev_slot(base_info), base_info->name, base_info->raid_bdev->bdev.name);
2265 
2266 	rc = _raid_bdev_remove_base_bdev(base_info, raid_bdev_fail_base_remove_cb, base_info);
2267 	if (rc != 0) {
2268 		raid_bdev_fail_base_remove_cb(base_info, rc);
2269 	}
2270 }
2271 
2272 void
2273 raid_bdev_fail_base_bdev(struct raid_base_bdev_info *base_info)
2274 {
2275 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_fail_base_bdev, base_info);
2276 }
2277 
2278 static void
2279 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2280 {
2281 	if (status != 0) {
2282 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2283 			    raid_bdev->bdev.name, spdk_strerror(-status));
2284 	}
2285 }
2286 
2287 /*
2288  * brief:
2289  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2290  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2291  * If yes, call module handler to resize the raid_bdev if implemented.
2292  * params:
2293  * base_bdev - pointer to base bdev which got resized.
2294  * returns:
2295  * none
2296  */
2297 static void
2298 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2299 {
2300 	struct raid_bdev *raid_bdev;
2301 	struct raid_base_bdev_info *base_info;
2302 	uint64_t blockcnt_old;
2303 
2304 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2305 
2306 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2307 
2308 	/* Find the raid_bdev which has claimed this base_bdev */
2309 	if (!base_info) {
2310 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2311 		return;
2312 	}
2313 	raid_bdev = base_info->raid_bdev;
2314 
2315 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2316 
2317 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2318 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2319 
2320 	base_info->blockcnt = base_bdev->blockcnt;
2321 
2322 	if (!raid_bdev->module->resize) {
2323 		return;
2324 	}
2325 
2326 	blockcnt_old = raid_bdev->bdev.blockcnt;
2327 	if (raid_bdev->module->resize(raid_bdev) == false) {
2328 		return;
2329 	}
2330 
2331 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2332 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2333 
2334 	if (raid_bdev->superblock_enabled) {
2335 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2336 		uint8_t i;
2337 
2338 		for (i = 0; i < sb->base_bdevs_size; i++) {
2339 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2340 
2341 			if (sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2342 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2343 				sb_base_bdev->data_size = base_info->data_size;
2344 			}
2345 		}
2346 		sb->raid_size = raid_bdev->bdev.blockcnt;
2347 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2348 	}
2349 }
2350 
2351 /*
2352  * brief:
2353  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2354  * triggers asynchronous event.
2355  * params:
2356  * type - event details.
2357  * bdev - bdev that triggered event.
2358  * event_ctx - context for event.
2359  * returns:
2360  * none
2361  */
2362 static void
2363 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2364 			  void *event_ctx)
2365 {
2366 	int rc;
2367 
2368 	switch (type) {
2369 	case SPDK_BDEV_EVENT_REMOVE:
2370 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2371 		if (rc != 0) {
2372 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2373 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2374 		}
2375 		break;
2376 	case SPDK_BDEV_EVENT_RESIZE:
2377 		raid_bdev_resize_base_bdev(bdev);
2378 		break;
2379 	default:
2380 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2381 		break;
2382 	}
2383 }
2384 
2385 /*
2386  * brief:
2387  * Deletes the specified raid bdev
2388  * params:
2389  * raid_bdev - pointer to raid bdev
2390  * cb_fn - callback function
2391  * cb_arg - argument to callback function
2392  */
2393 void
2394 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2395 {
2396 	struct raid_base_bdev_info *base_info;
2397 
2398 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2399 
2400 	if (raid_bdev->destroy_started) {
2401 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2402 			      raid_bdev->bdev.name);
2403 		if (cb_fn) {
2404 			cb_fn(cb_arg, -EALREADY);
2405 		}
2406 		return;
2407 	}
2408 
2409 	raid_bdev->destroy_started = true;
2410 
2411 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2412 		base_info->remove_scheduled = true;
2413 
2414 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2415 			/*
2416 			 * As raid bdev is not registered yet or already unregistered,
2417 			 * so cleanup should be done here itself.
2418 			 */
2419 			raid_bdev_free_base_bdev_resource(base_info);
2420 		}
2421 	}
2422 
2423 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2424 		/* There is no base bdev for this raid, so free the raid device. */
2425 		raid_bdev_cleanup_and_free(raid_bdev);
2426 		if (cb_fn) {
2427 			cb_fn(cb_arg, 0);
2428 		}
2429 	} else {
2430 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2431 	}
2432 }
2433 
2434 static void
2435 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2436 {
2437 	if (status != 0) {
2438 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2439 			    raid_bdev->bdev.name, spdk_strerror(-status));
2440 	}
2441 }
2442 
2443 static void
2444 raid_bdev_process_finish_write_sb(void *ctx)
2445 {
2446 	struct raid_bdev *raid_bdev = ctx;
2447 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2448 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2449 	struct raid_base_bdev_info *base_info;
2450 	uint8_t i;
2451 
2452 	for (i = 0; i < sb->base_bdevs_size; i++) {
2453 		sb_base_bdev = &sb->base_bdevs[i];
2454 
2455 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2456 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2457 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2458 			if (base_info->is_configured) {
2459 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2460 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2461 			}
2462 		}
2463 	}
2464 
2465 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2466 }
2467 
2468 static void raid_bdev_process_free(struct raid_bdev_process *process);
2469 
2470 static void
2471 _raid_bdev_process_finish_done(void *ctx)
2472 {
2473 	struct raid_bdev_process *process = ctx;
2474 	struct raid_process_finish_action *finish_action;
2475 
2476 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2477 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2478 		finish_action->cb(finish_action->cb_ctx);
2479 		free(finish_action);
2480 	}
2481 
2482 	spdk_poller_unregister(&process->qos.process_continue_poller);
2483 
2484 	raid_bdev_process_free(process);
2485 
2486 	spdk_thread_exit(spdk_get_thread());
2487 }
2488 
2489 static void
2490 raid_bdev_process_finish_target_removed(void *ctx, int status)
2491 {
2492 	struct raid_bdev_process *process = ctx;
2493 
2494 	if (status != 0) {
2495 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2496 	}
2497 
2498 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2499 }
2500 
2501 static void
2502 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2503 {
2504 	struct raid_bdev_process *process = ctx;
2505 
2506 	if (status != 0) {
2507 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2508 	}
2509 
2510 	if (process->status != 0) {
2511 		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2512 						     process);
2513 		if (status != 0) {
2514 			raid_bdev_process_finish_target_removed(process, status);
2515 		}
2516 		return;
2517 	}
2518 
2519 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2520 }
2521 
2522 static void
2523 raid_bdev_process_finish_unquiesce(void *ctx)
2524 {
2525 	struct raid_bdev_process *process = ctx;
2526 	int rc;
2527 
2528 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2529 				 raid_bdev_process_finish_unquiesced, process);
2530 	if (rc != 0) {
2531 		raid_bdev_process_finish_unquiesced(process, rc);
2532 	}
2533 }
2534 
2535 static void
2536 raid_bdev_process_finish_done(void *ctx)
2537 {
2538 	struct raid_bdev_process *process = ctx;
2539 	struct raid_bdev *raid_bdev = process->raid_bdev;
2540 
2541 	if (process->raid_ch != NULL) {
2542 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2543 	}
2544 
2545 	process->state = RAID_PROCESS_STATE_STOPPED;
2546 
2547 	if (process->status == 0) {
2548 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2549 			       raid_bdev_process_to_str(process->type),
2550 			       raid_bdev->bdev.name);
2551 		if (raid_bdev->superblock_enabled) {
2552 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2553 					     raid_bdev_process_finish_write_sb,
2554 					     raid_bdev);
2555 		}
2556 	} else {
2557 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2558 			     raid_bdev_process_to_str(process->type),
2559 			     raid_bdev->bdev.name,
2560 			     spdk_strerror(-process->status));
2561 	}
2562 
2563 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2564 			     process);
2565 }
2566 
2567 static void
2568 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2569 {
2570 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2571 
2572 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2573 }
2574 
2575 static void
2576 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2577 {
2578 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2579 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2580 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2581 
2582 	if (process->status == 0) {
2583 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2584 
2585 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2586 		raid_ch->process.target_ch = NULL;
2587 	}
2588 
2589 	raid_bdev_ch_process_cleanup(raid_ch);
2590 
2591 	spdk_for_each_channel_continue(i, 0);
2592 }
2593 
2594 static void
2595 raid_bdev_process_finish_quiesced(void *ctx, int status)
2596 {
2597 	struct raid_bdev_process *process = ctx;
2598 	struct raid_bdev *raid_bdev = process->raid_bdev;
2599 
2600 	if (status != 0) {
2601 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2602 		return;
2603 	}
2604 
2605 	raid_bdev->process = NULL;
2606 	process->target->is_process_target = false;
2607 
2608 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2609 			      __raid_bdev_process_finish);
2610 }
2611 
2612 static void
2613 _raid_bdev_process_finish(void *ctx)
2614 {
2615 	struct raid_bdev_process *process = ctx;
2616 	int rc;
2617 
2618 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2619 			       raid_bdev_process_finish_quiesced, process);
2620 	if (rc != 0) {
2621 		raid_bdev_process_finish_quiesced(ctx, rc);
2622 	}
2623 }
2624 
2625 static void
2626 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2627 {
2628 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2629 }
2630 
2631 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2632 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2633 
2634 static void
2635 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2636 {
2637 	assert(spdk_get_thread() == process->thread);
2638 
2639 	if (process->status == 0) {
2640 		process->status = status;
2641 	}
2642 
2643 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2644 		return;
2645 	}
2646 
2647 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2648 	process->state = RAID_PROCESS_STATE_STOPPING;
2649 
2650 	if (process->window_range_locked) {
2651 		raid_bdev_process_unlock_window_range(process);
2652 	} else {
2653 		raid_bdev_process_thread_run(process);
2654 	}
2655 }
2656 
2657 static void
2658 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2659 {
2660 	struct raid_bdev_process *process = ctx;
2661 
2662 	if (status != 0) {
2663 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2664 		raid_bdev_process_finish(process, status);
2665 		return;
2666 	}
2667 
2668 	process->window_range_locked = false;
2669 	process->window_offset += process->window_size;
2670 
2671 	raid_bdev_process_thread_run(process);
2672 }
2673 
2674 static void
2675 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2676 {
2677 	int rc;
2678 
2679 	assert(process->window_range_locked == true);
2680 
2681 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2682 				       process->window_offset, process->max_window_size,
2683 				       raid_bdev_process_window_range_unlocked, process);
2684 	if (rc != 0) {
2685 		raid_bdev_process_window_range_unlocked(process, rc);
2686 	}
2687 }
2688 
2689 static void
2690 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2691 {
2692 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2693 
2694 	raid_bdev_process_unlock_window_range(process);
2695 }
2696 
2697 static void
2698 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2699 {
2700 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2701 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2702 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2703 
2704 	raid_ch->process.offset = process->window_offset + process->window_size;
2705 
2706 	spdk_for_each_channel_continue(i, 0);
2707 }
2708 
2709 void
2710 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2711 {
2712 	struct raid_bdev_process *process = process_req->process;
2713 
2714 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2715 
2716 	assert(spdk_get_thread() == process->thread);
2717 	assert(process->window_remaining >= process_req->num_blocks);
2718 
2719 	if (status != 0) {
2720 		process->window_status = status;
2721 	}
2722 
2723 	process->window_remaining -= process_req->num_blocks;
2724 	if (process->window_remaining == 0) {
2725 		if (process->window_status != 0) {
2726 			raid_bdev_process_finish(process, process->window_status);
2727 			return;
2728 		}
2729 
2730 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2731 				      raid_bdev_process_channels_update_done);
2732 	}
2733 }
2734 
2735 static int
2736 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2737 				 uint32_t num_blocks)
2738 {
2739 	struct raid_bdev *raid_bdev = process->raid_bdev;
2740 	struct raid_bdev_process_request *process_req;
2741 	int ret;
2742 
2743 	process_req = TAILQ_FIRST(&process->requests);
2744 	if (process_req == NULL) {
2745 		assert(process->window_remaining > 0);
2746 		return 0;
2747 	}
2748 
2749 	process_req->target = process->target;
2750 	process_req->target_ch = process->raid_ch->process.target_ch;
2751 	process_req->offset_blocks = offset_blocks;
2752 	process_req->num_blocks = num_blocks;
2753 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2754 
2755 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2756 	if (ret <= 0) {
2757 		if (ret < 0) {
2758 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2759 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2760 			process->window_status = ret;
2761 		}
2762 		return ret;
2763 	}
2764 
2765 	process_req->num_blocks = ret;
2766 	TAILQ_REMOVE(&process->requests, process_req, link);
2767 
2768 	return ret;
2769 }
2770 
2771 static void
2772 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2773 {
2774 	struct raid_bdev *raid_bdev = process->raid_bdev;
2775 	uint64_t offset = process->window_offset;
2776 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2777 	int ret;
2778 
2779 	while (offset < offset_end) {
2780 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2781 		if (ret <= 0) {
2782 			break;
2783 		}
2784 
2785 		process->window_remaining += ret;
2786 		offset += ret;
2787 	}
2788 
2789 	if (process->window_remaining > 0) {
2790 		process->window_size = process->window_remaining;
2791 	} else {
2792 		raid_bdev_process_finish(process, process->window_status);
2793 	}
2794 }
2795 
2796 static void
2797 raid_bdev_process_window_range_locked(void *ctx, int status)
2798 {
2799 	struct raid_bdev_process *process = ctx;
2800 
2801 	if (status != 0) {
2802 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2803 		raid_bdev_process_finish(process, status);
2804 		return;
2805 	}
2806 
2807 	process->window_range_locked = true;
2808 
2809 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2810 		raid_bdev_process_unlock_window_range(process);
2811 		return;
2812 	}
2813 
2814 	_raid_bdev_process_thread_run(process);
2815 }
2816 
2817 static bool
2818 raid_bdev_process_consume_token(struct raid_bdev_process *process)
2819 {
2820 	struct raid_bdev *raid_bdev = process->raid_bdev;
2821 	uint64_t now = spdk_get_ticks();
2822 
2823 	process->qos.bytes_available = spdk_min(process->qos.bytes_max,
2824 						process->qos.bytes_available +
2825 						(now - process->qos.last_tsc) * process->qos.bytes_per_tsc);
2826 	process->qos.last_tsc = now;
2827 	if (process->qos.bytes_available > 0.0) {
2828 		process->qos.bytes_available -= process->window_size * raid_bdev->bdev.blocklen;
2829 		return true;
2830 	}
2831 	return false;
2832 }
2833 
2834 static bool
2835 raid_bdev_process_lock_window_range(struct raid_bdev_process *process)
2836 {
2837 	struct raid_bdev *raid_bdev = process->raid_bdev;
2838 	int rc;
2839 
2840 	assert(process->window_range_locked == false);
2841 
2842 	if (process->qos.enable_qos) {
2843 		if (raid_bdev_process_consume_token(process)) {
2844 			spdk_poller_pause(process->qos.process_continue_poller);
2845 		} else {
2846 			spdk_poller_resume(process->qos.process_continue_poller);
2847 			return false;
2848 		}
2849 	}
2850 
2851 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2852 				     process->window_offset, process->max_window_size,
2853 				     raid_bdev_process_window_range_locked, process);
2854 	if (rc != 0) {
2855 		raid_bdev_process_window_range_locked(process, rc);
2856 	}
2857 	return true;
2858 }
2859 
2860 static int
2861 raid_bdev_process_continue_poll(void *arg)
2862 {
2863 	struct raid_bdev_process *process = arg;
2864 
2865 	if (raid_bdev_process_lock_window_range(process)) {
2866 		return SPDK_POLLER_BUSY;
2867 	}
2868 	return SPDK_POLLER_IDLE;
2869 }
2870 
2871 static void
2872 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2873 {
2874 	struct raid_bdev *raid_bdev = process->raid_bdev;
2875 
2876 	assert(spdk_get_thread() == process->thread);
2877 	assert(process->window_remaining == 0);
2878 	assert(process->window_range_locked == false);
2879 
2880 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2881 		raid_bdev_process_do_finish(process);
2882 		return;
2883 	}
2884 
2885 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2886 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2887 		raid_bdev_process_finish(process, 0);
2888 		return;
2889 	}
2890 
2891 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2892 					    process->max_window_size);
2893 	raid_bdev_process_lock_window_range(process);
2894 }
2895 
2896 static void
2897 raid_bdev_process_thread_init(void *ctx)
2898 {
2899 	struct raid_bdev_process *process = ctx;
2900 	struct raid_bdev *raid_bdev = process->raid_bdev;
2901 	struct spdk_io_channel *ch;
2902 
2903 	process->thread = spdk_get_thread();
2904 
2905 	ch = spdk_get_io_channel(raid_bdev);
2906 	if (ch == NULL) {
2907 		process->status = -ENOMEM;
2908 		raid_bdev_process_do_finish(process);
2909 		return;
2910 	}
2911 
2912 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2913 	process->state = RAID_PROCESS_STATE_RUNNING;
2914 
2915 	if (process->qos.enable_qos) {
2916 		process->qos.process_continue_poller = SPDK_POLLER_REGISTER(raid_bdev_process_continue_poll,
2917 						       process, 0);
2918 		spdk_poller_pause(process->qos.process_continue_poller);
2919 	}
2920 
2921 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2922 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2923 
2924 	raid_bdev_process_thread_run(process);
2925 }
2926 
2927 static void
2928 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2929 {
2930 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2931 
2932 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2933 	raid_bdev_process_free(process);
2934 
2935 	/* TODO: update sb */
2936 }
2937 
2938 static void
2939 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2940 {
2941 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2942 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2943 
2944 	raid_bdev_ch_process_cleanup(raid_ch);
2945 
2946 	spdk_for_each_channel_continue(i, 0);
2947 }
2948 
2949 static void
2950 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2951 {
2952 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2953 	struct raid_bdev *raid_bdev = process->raid_bdev;
2954 	struct spdk_thread *thread;
2955 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2956 
2957 	if (status == 0 &&
2958 	    (process->target->remove_scheduled || !process->target->is_configured ||
2959 	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2960 		/* a base bdev was removed before we got here */
2961 		status = -ENODEV;
2962 	}
2963 
2964 	if (status != 0) {
2965 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2966 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2967 			    spdk_strerror(-status));
2968 		goto err;
2969 	}
2970 
2971 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2972 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2973 
2974 	thread = spdk_thread_create(thread_name, NULL);
2975 	if (thread == NULL) {
2976 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2977 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2978 		goto err;
2979 	}
2980 
2981 	raid_bdev->process = process;
2982 
2983 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2984 
2985 	return;
2986 err:
2987 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2988 			      raid_bdev_channels_abort_start_process_done);
2989 }
2990 
2991 static void
2992 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2993 {
2994 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2995 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2996 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2997 	int rc;
2998 
2999 	rc = raid_bdev_ch_process_setup(raid_ch, process);
3000 
3001 	spdk_for_each_channel_continue(i, rc);
3002 }
3003 
3004 static void
3005 raid_bdev_process_start(struct raid_bdev_process *process)
3006 {
3007 	struct raid_bdev *raid_bdev = process->raid_bdev;
3008 
3009 	assert(raid_bdev->module->submit_process_request != NULL);
3010 
3011 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
3012 			      raid_bdev_channels_start_process_done);
3013 }
3014 
3015 static void
3016 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
3017 {
3018 	spdk_dma_free(process_req->iov.iov_base);
3019 	spdk_dma_free(process_req->md_buf);
3020 	free(process_req);
3021 }
3022 
3023 static struct raid_bdev_process_request *
3024 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
3025 {
3026 	struct raid_bdev *raid_bdev = process->raid_bdev;
3027 	struct raid_bdev_process_request *process_req;
3028 
3029 	process_req = calloc(1, sizeof(*process_req));
3030 	if (process_req == NULL) {
3031 		return NULL;
3032 	}
3033 
3034 	process_req->process = process;
3035 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
3036 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
3037 	if (process_req->iov.iov_base == NULL) {
3038 		free(process_req);
3039 		return NULL;
3040 	}
3041 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
3042 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
3043 		if (process_req->md_buf == NULL) {
3044 			raid_bdev_process_request_free(process_req);
3045 			return NULL;
3046 		}
3047 	}
3048 
3049 	return process_req;
3050 }
3051 
3052 static void
3053 raid_bdev_process_free(struct raid_bdev_process *process)
3054 {
3055 	struct raid_bdev_process_request *process_req;
3056 
3057 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
3058 		TAILQ_REMOVE(&process->requests, process_req, link);
3059 		raid_bdev_process_request_free(process_req);
3060 	}
3061 
3062 	free(process);
3063 }
3064 
3065 static struct raid_bdev_process *
3066 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
3067 			struct raid_base_bdev_info *target)
3068 {
3069 	struct raid_bdev_process *process;
3070 	struct raid_bdev_process_request *process_req;
3071 	int i;
3072 
3073 	process = calloc(1, sizeof(*process));
3074 	if (process == NULL) {
3075 		return NULL;
3076 	}
3077 
3078 	process->raid_bdev = raid_bdev;
3079 	process->type = type;
3080 	process->target = target;
3081 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
3082 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
3083 					    raid_bdev->bdev.write_unit_size);
3084 	TAILQ_INIT(&process->requests);
3085 	TAILQ_INIT(&process->finish_actions);
3086 
3087 	if (g_opts.process_max_bandwidth_mb_sec != 0) {
3088 		process->qos.enable_qos = true;
3089 		process->qos.last_tsc = spdk_get_ticks();
3090 		process->qos.bytes_per_tsc = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 /
3091 					     spdk_get_ticks_hz();
3092 		process->qos.bytes_max = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 / SPDK_SEC_TO_MSEC;
3093 		process->qos.bytes_available = 0.0;
3094 	}
3095 
3096 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
3097 		process_req = raid_bdev_process_alloc_request(process);
3098 		if (process_req == NULL) {
3099 			raid_bdev_process_free(process);
3100 			return NULL;
3101 		}
3102 
3103 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
3104 	}
3105 
3106 	return process;
3107 }
3108 
3109 static int
3110 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
3111 {
3112 	struct raid_bdev_process *process;
3113 
3114 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3115 
3116 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
3117 	if (process == NULL) {
3118 		return -ENOMEM;
3119 	}
3120 
3121 	raid_bdev_process_start(process);
3122 
3123 	return 0;
3124 }
3125 
3126 static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
3127 
3128 static void
3129 _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
3130 {
3131 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
3132 
3133 	raid_bdev_configure_base_bdev_cont(base_info);
3134 }
3135 
3136 static void
3137 raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3138 {
3139 	spdk_for_each_channel_continue(i, 0);
3140 }
3141 
3142 static void
3143 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3144 {
3145 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3146 	raid_base_bdev_cb configure_cb;
3147 	int rc;
3148 
3149 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3150 	    base_info->is_process_target == false) {
3151 		/* TODO: defer if rebuild in progress on another base bdev */
3152 		assert(raid_bdev->process == NULL);
3153 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3154 		base_info->is_process_target = true;
3155 		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3156 		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3157 		return;
3158 	}
3159 
3160 	base_info->is_configured = true;
3161 
3162 	raid_bdev->num_base_bdevs_discovered++;
3163 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3164 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3165 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3166 
3167 	configure_cb = base_info->configure_cb;
3168 	base_info->configure_cb = NULL;
3169 	/*
3170 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3171 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3172 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3173 	 * degraded.
3174 	 */
3175 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3176 		rc = raid_bdev_configure(raid_bdev, configure_cb, base_info->configure_cb_ctx);
3177 		if (rc != 0) {
3178 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3179 		} else {
3180 			configure_cb = NULL;
3181 		}
3182 	} else if (base_info->is_process_target) {
3183 		raid_bdev->num_base_bdevs_operational++;
3184 		rc = raid_bdev_start_rebuild(base_info);
3185 		if (rc != 0) {
3186 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3187 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3188 		}
3189 	} else {
3190 		rc = 0;
3191 	}
3192 
3193 	if (configure_cb != NULL) {
3194 		configure_cb(base_info->configure_cb_ctx, rc);
3195 	}
3196 }
3197 
3198 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3199 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3200 
3201 static void
3202 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3203 		void *ctx)
3204 {
3205 	struct raid_base_bdev_info *base_info = ctx;
3206 	raid_base_bdev_cb configure_cb = base_info->configure_cb;
3207 
3208 	switch (status) {
3209 	case 0:
3210 		/* valid superblock found */
3211 		base_info->configure_cb = NULL;
3212 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3213 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3214 
3215 			raid_bdev_free_base_bdev_resource(base_info);
3216 			raid_bdev_examine_sb(sb, bdev, configure_cb, base_info->configure_cb_ctx);
3217 			return;
3218 		}
3219 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3220 		status = -EEXIST;
3221 		raid_bdev_free_base_bdev_resource(base_info);
3222 		break;
3223 	case -EINVAL:
3224 		/* no valid superblock */
3225 		raid_bdev_configure_base_bdev_cont(base_info);
3226 		return;
3227 	default:
3228 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3229 			    base_info->name, spdk_strerror(-status));
3230 		break;
3231 	}
3232 
3233 	if (configure_cb != NULL) {
3234 		base_info->configure_cb = NULL;
3235 		configure_cb(base_info->configure_cb_ctx, status);
3236 	}
3237 }
3238 
3239 static int
3240 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3241 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3242 {
3243 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3244 	struct spdk_bdev_desc *desc;
3245 	struct spdk_bdev *bdev;
3246 	const struct spdk_uuid *bdev_uuid;
3247 	int rc;
3248 
3249 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3250 	assert(base_info->desc == NULL);
3251 
3252 	/*
3253 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3254 	 * before claiming the bdev.
3255 	 */
3256 
3257 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3258 		char uuid_str[SPDK_UUID_STRING_LEN];
3259 		const char *bdev_name;
3260 
3261 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3262 
3263 		/* UUID of a bdev is registered as its alias */
3264 		bdev = spdk_bdev_get_by_name(uuid_str);
3265 		if (bdev == NULL) {
3266 			return -ENODEV;
3267 		}
3268 
3269 		bdev_name = spdk_bdev_get_name(bdev);
3270 
3271 		if (base_info->name == NULL) {
3272 			assert(existing == true);
3273 			base_info->name = strdup(bdev_name);
3274 			if (base_info->name == NULL) {
3275 				return -ENOMEM;
3276 			}
3277 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3278 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3279 				    bdev_name, base_info->name);
3280 			return -EINVAL;
3281 		}
3282 	}
3283 
3284 	assert(base_info->name != NULL);
3285 
3286 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3287 	if (rc != 0) {
3288 		if (rc != -ENODEV) {
3289 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3290 		}
3291 		return rc;
3292 	}
3293 
3294 	bdev = spdk_bdev_desc_get_bdev(desc);
3295 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3296 
3297 	if (spdk_uuid_is_null(&base_info->uuid)) {
3298 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3299 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3300 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3301 		spdk_bdev_close(desc);
3302 		return -EINVAL;
3303 	}
3304 
3305 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3306 	if (rc != 0) {
3307 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3308 		spdk_bdev_close(desc);
3309 		return rc;
3310 	}
3311 
3312 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3313 
3314 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3315 	if (base_info->app_thread_ch == NULL) {
3316 		SPDK_ERRLOG("Failed to get io channel\n");
3317 		spdk_bdev_module_release_bdev(bdev);
3318 		spdk_bdev_close(desc);
3319 		return -ENOMEM;
3320 	}
3321 
3322 	base_info->desc = desc;
3323 	base_info->blockcnt = bdev->blockcnt;
3324 
3325 	if (raid_bdev->superblock_enabled) {
3326 		uint64_t data_offset;
3327 
3328 		if (base_info->data_offset == 0) {
3329 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3330 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3331 		} else {
3332 			data_offset = base_info->data_offset;
3333 		}
3334 
3335 		if (bdev->optimal_io_boundary != 0) {
3336 			data_offset = spdk_divide_round_up(data_offset,
3337 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3338 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3339 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3340 					     base_info->data_offset, base_info->name, data_offset);
3341 				data_offset = base_info->data_offset;
3342 			}
3343 		}
3344 
3345 		base_info->data_offset = data_offset;
3346 	}
3347 
3348 	if (base_info->data_offset >= bdev->blockcnt) {
3349 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3350 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3351 		rc = -EINVAL;
3352 		goto out;
3353 	}
3354 
3355 	if (base_info->data_size == 0) {
3356 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3357 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3358 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3359 			    bdev->blockcnt, base_info->name);
3360 		rc = -EINVAL;
3361 		goto out;
3362 	}
3363 
3364 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3365 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3366 			    bdev->name);
3367 		rc = -EINVAL;
3368 		goto out;
3369 	}
3370 
3371 	/*
3372 	 * Set the raid bdev properties if this is the first base bdev configured,
3373 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3374 	 * have the same blocklen and metadata format.
3375 	 */
3376 	if (raid_bdev->bdev.blocklen == 0) {
3377 		raid_bdev->bdev.blocklen = bdev->blocklen;
3378 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3379 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3380 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3381 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3382 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3383 		raid_bdev->bdev.dif_pi_format = bdev->dif_pi_format;
3384 	} else {
3385 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3386 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3387 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3388 			rc = -EINVAL;
3389 			goto out;
3390 		}
3391 
3392 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3393 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3394 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3395 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3396 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev) ||
3397 		    raid_bdev->bdev.dif_pi_format != bdev->dif_pi_format) {
3398 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3399 				    raid_bdev->bdev.name, bdev->name);
3400 			rc = -EINVAL;
3401 			goto out;
3402 		}
3403 	}
3404 
3405 	assert(base_info->configure_cb == NULL);
3406 	base_info->configure_cb = cb_fn;
3407 	base_info->configure_cb_ctx = cb_ctx;
3408 
3409 	if (existing) {
3410 		raid_bdev_configure_base_bdev_cont(base_info);
3411 	} else {
3412 		/* check for existing superblock when using a new bdev */
3413 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3414 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3415 		if (rc) {
3416 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3417 				    bdev->name, spdk_strerror(-rc));
3418 		}
3419 	}
3420 out:
3421 	if (rc != 0) {
3422 		base_info->configure_cb = NULL;
3423 		raid_bdev_free_base_bdev_resource(base_info);
3424 	}
3425 	return rc;
3426 }
3427 
3428 int
3429 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3430 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3431 {
3432 	struct raid_base_bdev_info *base_info = NULL, *iter;
3433 	int rc;
3434 
3435 	assert(name != NULL);
3436 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3437 
3438 	if (raid_bdev->process != NULL) {
3439 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3440 			    raid_bdev->bdev.name);
3441 		return -EPERM;
3442 	}
3443 
3444 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3445 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3446 
3447 		if (bdev != NULL) {
3448 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3449 				if (iter->name == NULL &&
3450 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3451 					base_info = iter;
3452 					break;
3453 				}
3454 			}
3455 		}
3456 	}
3457 
3458 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3459 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3460 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3461 				base_info = iter;
3462 				break;
3463 			}
3464 		}
3465 	}
3466 
3467 	if (base_info == NULL) {
3468 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3469 			    raid_bdev->bdev.name, name);
3470 		return -EINVAL;
3471 	}
3472 
3473 	assert(base_info->is_configured == false);
3474 
3475 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3476 		assert(base_info->data_size != 0);
3477 		assert(base_info->desc == NULL);
3478 	}
3479 
3480 	base_info->name = strdup(name);
3481 	if (base_info->name == NULL) {
3482 		return -ENOMEM;
3483 	}
3484 
3485 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3486 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3487 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3488 		free(base_info->name);
3489 		base_info->name = NULL;
3490 	}
3491 
3492 	return rc;
3493 }
3494 
3495 static int
3496 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3497 {
3498 	struct raid_bdev *raid_bdev;
3499 	uint8_t i;
3500 	int rc;
3501 
3502 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3503 			       sb->level, true, &sb->uuid, &raid_bdev);
3504 	if (rc != 0) {
3505 		return rc;
3506 	}
3507 
3508 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3509 	if (rc != 0) {
3510 		raid_bdev_free(raid_bdev);
3511 		return rc;
3512 	}
3513 
3514 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3515 	memcpy(raid_bdev->sb, sb, sb->length);
3516 
3517 	for (i = 0; i < sb->base_bdevs_size; i++) {
3518 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3519 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3520 
3521 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3522 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3523 			raid_bdev->num_base_bdevs_operational++;
3524 		}
3525 
3526 		base_info->data_offset = sb_base_bdev->data_offset;
3527 		base_info->data_size = sb_base_bdev->data_size;
3528 	}
3529 
3530 	*raid_bdev_out = raid_bdev;
3531 	return 0;
3532 }
3533 
3534 static void
3535 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3536 {
3537 	struct raid_bdev *raid_bdev;
3538 	struct raid_base_bdev_info *base_info;
3539 
3540 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3541 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3542 			continue;
3543 		}
3544 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3545 			if (base_info->desc == NULL &&
3546 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3547 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3548 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3549 				break;
3550 			}
3551 		}
3552 	}
3553 }
3554 
3555 struct raid_bdev_examine_others_ctx {
3556 	struct spdk_uuid raid_bdev_uuid;
3557 	uint8_t current_base_bdev_idx;
3558 	raid_base_bdev_cb cb_fn;
3559 	void *cb_ctx;
3560 };
3561 
3562 static void
3563 raid_bdev_examine_others_done(void *_ctx, int status)
3564 {
3565 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3566 
3567 	if (ctx->cb_fn != NULL) {
3568 		ctx->cb_fn(ctx->cb_ctx, status);
3569 	}
3570 	free(ctx);
3571 }
3572 
3573 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3574 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3575 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3576 				     void *cb_ctx);
3577 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3578 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3579 static void raid_bdev_examine_others(void *_ctx, int status);
3580 
3581 static void
3582 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3583 				 int status, void *_ctx)
3584 {
3585 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3586 
3587 	if (status != 0) {
3588 		raid_bdev_examine_others_done(ctx, status);
3589 		return;
3590 	}
3591 
3592 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3593 }
3594 
3595 static void
3596 raid_bdev_examine_others(void *_ctx, int status)
3597 {
3598 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3599 	struct raid_bdev *raid_bdev;
3600 	struct raid_base_bdev_info *base_info;
3601 	char uuid_str[SPDK_UUID_STRING_LEN];
3602 
3603 	if (status != 0 && status != -EEXIST) {
3604 		goto out;
3605 	}
3606 
3607 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3608 	if (raid_bdev == NULL) {
3609 		status = -ENODEV;
3610 		goto out;
3611 	}
3612 
3613 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3614 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3615 	     base_info++) {
3616 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3617 			continue;
3618 		}
3619 
3620 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3621 
3622 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3623 			continue;
3624 		}
3625 
3626 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3627 
3628 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3629 		if (status != 0) {
3630 			continue;
3631 		}
3632 		return;
3633 	}
3634 out:
3635 	raid_bdev_examine_others_done(ctx, status);
3636 }
3637 
3638 static void
3639 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3640 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3641 {
3642 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3643 	struct raid_bdev *raid_bdev;
3644 	struct raid_base_bdev_info *iter, *base_info;
3645 	uint8_t i;
3646 	int rc;
3647 
3648 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3649 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3650 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3651 		rc = -EINVAL;
3652 		goto out;
3653 	}
3654 
3655 	if (spdk_uuid_is_null(&sb->uuid)) {
3656 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3657 		rc = -EINVAL;
3658 		goto out;
3659 	}
3660 
3661 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3662 
3663 	if (raid_bdev) {
3664 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3665 			SPDK_DEBUGLOG(bdev_raid,
3666 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3667 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3668 
3669 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3670 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3671 					     raid_bdev->bdev.name, bdev->name);
3672 				rc = -EBUSY;
3673 				goto out;
3674 			}
3675 
3676 			/* remove and then recreate the raid bdev using the newer superblock */
3677 			raid_bdev_delete(raid_bdev, NULL, NULL);
3678 			raid_bdev = NULL;
3679 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3680 			SPDK_DEBUGLOG(bdev_raid,
3681 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3682 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3683 			/* use the current raid bdev superblock */
3684 			sb = raid_bdev->sb;
3685 		}
3686 	}
3687 
3688 	for (i = 0; i < sb->base_bdevs_size; i++) {
3689 		sb_base_bdev = &sb->base_bdevs[i];
3690 
3691 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3692 
3693 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3694 			break;
3695 		}
3696 	}
3697 
3698 	if (i == sb->base_bdevs_size) {
3699 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3700 		rc = -EINVAL;
3701 		goto out;
3702 	}
3703 
3704 	if (!raid_bdev) {
3705 		struct raid_bdev_examine_others_ctx *ctx;
3706 
3707 		ctx = calloc(1, sizeof(*ctx));
3708 		if (ctx == NULL) {
3709 			rc = -ENOMEM;
3710 			goto out;
3711 		}
3712 
3713 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3714 		if (rc != 0) {
3715 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3716 				    sb->name, spdk_strerror(-rc));
3717 			free(ctx);
3718 			goto out;
3719 		}
3720 
3721 		/* after this base bdev is configured, examine other base bdevs that may be present */
3722 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3723 		ctx->cb_fn = cb_fn;
3724 		ctx->cb_ctx = cb_ctx;
3725 
3726 		cb_fn = raid_bdev_examine_others;
3727 		cb_ctx = ctx;
3728 	}
3729 
3730 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3731 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3732 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3733 		assert(base_info->is_configured == false);
3734 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3735 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3736 		assert(spdk_uuid_is_null(&base_info->uuid));
3737 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3738 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3739 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3740 		if (rc != 0) {
3741 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3742 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3743 		}
3744 		goto out;
3745 	}
3746 
3747 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3748 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3749 			       bdev->name, raid_bdev->bdev.name);
3750 		rc = -EINVAL;
3751 		goto out;
3752 	}
3753 
3754 	base_info = NULL;
3755 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3756 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3757 			base_info = iter;
3758 			break;
3759 		}
3760 	}
3761 
3762 	if (base_info == NULL) {
3763 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3764 			    bdev->name, raid_bdev->bdev.name);
3765 		rc = -EINVAL;
3766 		goto out;
3767 	}
3768 
3769 	if (base_info->is_configured) {
3770 		rc = -EEXIST;
3771 		goto out;
3772 	}
3773 
3774 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3775 	if (rc != 0) {
3776 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3777 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3778 	}
3779 out:
3780 	if (rc != 0 && cb_fn != 0) {
3781 		cb_fn(cb_ctx, rc);
3782 	}
3783 }
3784 
3785 struct raid_bdev_examine_ctx {
3786 	struct spdk_bdev_desc *desc;
3787 	struct spdk_io_channel *ch;
3788 	raid_bdev_examine_load_sb_cb cb;
3789 	void *cb_ctx;
3790 };
3791 
3792 static void
3793 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3794 {
3795 	if (!ctx) {
3796 		return;
3797 	}
3798 
3799 	if (ctx->ch) {
3800 		spdk_put_io_channel(ctx->ch);
3801 	}
3802 
3803 	if (ctx->desc) {
3804 		spdk_bdev_close(ctx->desc);
3805 	}
3806 
3807 	free(ctx);
3808 }
3809 
3810 static void
3811 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3812 {
3813 	struct raid_bdev_examine_ctx *ctx = _ctx;
3814 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3815 
3816 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3817 
3818 	raid_bdev_examine_ctx_free(ctx);
3819 }
3820 
3821 static void
3822 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3823 {
3824 }
3825 
3826 static int
3827 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3828 {
3829 	struct raid_bdev_examine_ctx *ctx;
3830 	int rc;
3831 
3832 	assert(cb != NULL);
3833 
3834 	ctx = calloc(1, sizeof(*ctx));
3835 	if (!ctx) {
3836 		return -ENOMEM;
3837 	}
3838 
3839 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3840 	if (rc) {
3841 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3842 		goto err;
3843 	}
3844 
3845 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3846 	if (!ctx->ch) {
3847 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3848 		rc = -ENOMEM;
3849 		goto err;
3850 	}
3851 
3852 	ctx->cb = cb;
3853 	ctx->cb_ctx = cb_ctx;
3854 
3855 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3856 	if (rc) {
3857 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3858 			    bdev_name, spdk_strerror(-rc));
3859 		goto err;
3860 	}
3861 
3862 	return 0;
3863 err:
3864 	raid_bdev_examine_ctx_free(ctx);
3865 	return rc;
3866 }
3867 
3868 static void
3869 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3870 		       void *ctx)
3871 {
3872 	switch (status) {
3873 	case 0:
3874 		/* valid superblock found */
3875 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3876 		raid_bdev_examine_sb(sb, bdev, NULL, NULL);
3877 		break;
3878 	case -EINVAL:
3879 		/* no valid superblock, check if it can be claimed anyway */
3880 		raid_bdev_examine_no_sb(bdev);
3881 		break;
3882 	default:
3883 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3884 			    bdev->name, spdk_strerror(-status));
3885 		break;
3886 	}
3887 
3888 	spdk_bdev_module_examine_done(&g_raid_if);
3889 }
3890 
3891 /*
3892  * brief:
3893  * raid_bdev_examine function is the examine function call by the below layers
3894  * like bdev_nvme layer. This function will check if this base bdev can be
3895  * claimed by this raid bdev or not.
3896  * params:
3897  * bdev - pointer to base bdev
3898  * returns:
3899  * none
3900  */
3901 static void
3902 raid_bdev_examine(struct spdk_bdev *bdev)
3903 {
3904 	int rc;
3905 
3906 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3907 		goto done;
3908 	}
3909 
3910 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3911 		raid_bdev_examine_no_sb(bdev);
3912 		goto done;
3913 	}
3914 
3915 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3916 	if (rc != 0) {
3917 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3918 			    bdev->name, spdk_strerror(-rc));
3919 		goto done;
3920 	}
3921 
3922 	return;
3923 done:
3924 	spdk_bdev_module_examine_done(&g_raid_if);
3925 }
3926 
3927 /* Log component for bdev raid bdev module */
3928 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3929