xref: /spdk/module/bdev/raid/bdev_raid.c (revision a91b02b21dc8435b8d024aed794ea016d855c839)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
244 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
245 		/*
246 		 * Get the spdk_io_channel for all the base bdevs. This is used during
247 		 * split logic to send the respective child bdev ios to respective base
248 		 * bdev io channel.
249 		 * Skip missing base bdevs and the process target, which should also be treated as
250 		 * missing until the process completes.
251 		 */
252 		if (raid_bdev->base_bdev_info[i].desc == NULL ||
253 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
254 			continue;
255 		}
256 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
257 						   raid_bdev->base_bdev_info[i].desc);
258 		if (!raid_ch->base_channel[i]) {
259 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
260 			goto err;
261 		}
262 	}
263 
264 	if (raid_bdev->process != NULL) {
265 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
266 		if (ret != 0) {
267 			SPDK_ERRLOG("Failed to setup process io channel\n");
268 			goto err;
269 		}
270 	} else {
271 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
272 	}
273 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
274 
275 	if (raid_bdev->module->get_io_channel) {
276 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
277 		if (!raid_ch->module_channel) {
278 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
279 			goto err_unlocked;
280 		}
281 	}
282 
283 	return 0;
284 err:
285 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
286 err_unlocked:
287 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
288 		if (raid_ch->base_channel[i] != NULL) {
289 			spdk_put_io_channel(raid_ch->base_channel[i]);
290 		}
291 	}
292 	free(raid_ch->base_channel);
293 
294 	raid_bdev_ch_process_cleanup(raid_ch);
295 
296 	return ret;
297 }
298 
299 /*
300  * brief:
301  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
302  * hierarchy from raid bdev to base bdev io channels. It will be called per core
303  * params:
304  * io_device - pointer to raid bdev io device represented by raid_bdev
305  * ctx_buf - pointer to context buffer for raid bdev io channel
306  * returns:
307  * none
308  */
309 static void
310 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
311 {
312 	struct raid_bdev *raid_bdev = io_device;
313 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
314 	uint8_t i;
315 
316 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
317 
318 	assert(raid_ch != NULL);
319 	assert(raid_ch->base_channel);
320 
321 	if (raid_ch->module_channel) {
322 		spdk_put_io_channel(raid_ch->module_channel);
323 	}
324 
325 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
326 		/* Free base bdev channels */
327 		if (raid_ch->base_channel[i] != NULL) {
328 			spdk_put_io_channel(raid_ch->base_channel[i]);
329 		}
330 	}
331 	free(raid_ch->base_channel);
332 	raid_ch->base_channel = NULL;
333 
334 	raid_bdev_ch_process_cleanup(raid_ch);
335 }
336 
337 /*
338  * brief:
339  * raid_bdev_cleanup is used to cleanup raid_bdev related data
340  * structures.
341  * params:
342  * raid_bdev - pointer to raid_bdev
343  * returns:
344  * none
345  */
346 static void
347 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
348 {
349 	struct raid_base_bdev_info *base_info;
350 
351 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
352 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
353 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
354 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
355 
356 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
357 		assert(base_info->desc == NULL);
358 		free(base_info->name);
359 	}
360 
361 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
362 }
363 
364 static void
365 raid_bdev_free(struct raid_bdev *raid_bdev)
366 {
367 	spdk_dma_free(raid_bdev->sb);
368 	spdk_spin_destroy(&raid_bdev->base_bdev_lock);
369 	free(raid_bdev->base_bdev_info);
370 	free(raid_bdev->bdev.name);
371 	free(raid_bdev);
372 }
373 
374 static void
375 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
376 {
377 	raid_bdev_cleanup(raid_bdev);
378 	raid_bdev_free(raid_bdev);
379 }
380 
381 /*
382  * brief:
383  * free resource of base bdev for raid bdev
384  * params:
385  * base_info - raid base bdev info
386  * returns:
387  * none
388  */
389 static void
390 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
391 {
392 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
393 
394 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
395 
396 	free(base_info->name);
397 	base_info->name = NULL;
398 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
399 		spdk_uuid_set_null(&base_info->uuid);
400 	}
401 
402 	if (base_info->desc == NULL) {
403 		return;
404 	}
405 
406 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
407 	spdk_bdev_close(base_info->desc);
408 	base_info->desc = NULL;
409 	spdk_put_io_channel(base_info->app_thread_ch);
410 	base_info->app_thread_ch = NULL;
411 
412 	if (base_info->is_configured) {
413 		assert(raid_bdev->num_base_bdevs_discovered);
414 		raid_bdev->num_base_bdevs_discovered--;
415 		base_info->is_configured = false;
416 	}
417 }
418 
419 static void
420 raid_bdev_io_device_unregister_cb(void *io_device)
421 {
422 	struct raid_bdev *raid_bdev = io_device;
423 
424 	if (raid_bdev->num_base_bdevs_discovered == 0) {
425 		/* Free raid_bdev when there are no base bdevs left */
426 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
427 		raid_bdev_cleanup(raid_bdev);
428 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
429 		raid_bdev_free(raid_bdev);
430 	} else {
431 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
432 	}
433 }
434 
435 void
436 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
437 {
438 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
439 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
440 	}
441 }
442 
443 static void
444 _raid_bdev_destruct(void *ctxt)
445 {
446 	struct raid_bdev *raid_bdev = ctxt;
447 	struct raid_base_bdev_info *base_info;
448 
449 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
450 
451 	assert(raid_bdev->process == NULL);
452 
453 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
454 		/*
455 		 * Close all base bdev descriptors for which call has come from below
456 		 * layers.  Also close the descriptors if we have started shutdown.
457 		 */
458 		if (g_shutdown_started || base_info->remove_scheduled == true) {
459 			raid_bdev_free_base_bdev_resource(base_info);
460 		}
461 	}
462 
463 	if (g_shutdown_started) {
464 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
465 	}
466 
467 	if (raid_bdev->module->stop != NULL) {
468 		if (raid_bdev->module->stop(raid_bdev) == false) {
469 			return;
470 		}
471 	}
472 
473 	raid_bdev_module_stop_done(raid_bdev);
474 }
475 
476 static int
477 raid_bdev_destruct(void *ctx)
478 {
479 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
480 
481 	return 1;
482 }
483 
484 /**
485  * Raid bdev I/O read/write wrapper for spdk_bdev_readv_blocks_ext function.
486  */
487 int
488 raid_bdev_readv_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
489 			   struct iovec *iov, int iovcnt, uint64_t offset_blocks,
490 			   uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
491 			   struct spdk_bdev_ext_io_opts *opts)
492 {
493 	return spdk_bdev_readv_blocks_ext(base_info->desc, ch, iov, iovcnt,
494 					  base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
495 }
496 
497 /**
498  * Raid bdev I/O read/write wrapper for spdk_bdev_writev_blocks_ext function.
499  */
500 int
501 raid_bdev_writev_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
502 			    struct iovec *iov, int iovcnt, uint64_t offset_blocks,
503 			    uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
504 			    struct spdk_bdev_ext_io_opts *opts)
505 {
506 	return spdk_bdev_writev_blocks_ext(base_info->desc, ch, iov, iovcnt,
507 					   base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
508 }
509 
510 void
511 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
512 {
513 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
514 
515 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
516 		struct iovec *split_iov = raid_io->split.iov;
517 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
518 
519 		/*
520 		 * Non-zero offset here means that this is the completion of the first part of the
521 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
522 		 */
523 		if (raid_io->split.offset != 0) {
524 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
525 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
526 
527 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
528 				raid_io->num_blocks = raid_io->split.offset;
529 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
530 				raid_io->iovs = bdev_io->u.bdev.iovs;
531 				if (split_iov != NULL) {
532 					raid_io->iovcnt++;
533 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
534 					split_iov->iov_base = split_iov_orig->iov_base;
535 				}
536 
537 				raid_io->split.offset = 0;
538 				raid_io->base_bdev_io_submitted = 0;
539 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
540 
541 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
542 				return;
543 			}
544 		}
545 
546 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
547 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
548 		raid_io->iovs = bdev_io->u.bdev.iovs;
549 		if (split_iov != NULL) {
550 			*split_iov = *split_iov_orig;
551 		}
552 	}
553 
554 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
555 		raid_io->completion_cb(raid_io, status);
556 	} else {
557 		spdk_bdev_io_complete(bdev_io, status);
558 	}
559 }
560 
561 /*
562  * brief:
563  * raid_bdev_io_complete_part - signal the completion of a part of the expected
564  * base bdev IOs and complete the raid_io if this is the final expected IO.
565  * The caller should first set raid_io->base_bdev_io_remaining. This function
566  * will decrement this counter by the value of the 'completed' parameter and
567  * complete the raid_io if the counter reaches 0. The caller is free to
568  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
569  * it can represent e.g. blocks or IOs.
570  * params:
571  * raid_io - pointer to raid_bdev_io
572  * completed - the part of the raid_io that has been completed
573  * status - status of the base IO
574  * returns:
575  * true - if the raid_io is completed
576  * false - otherwise
577  */
578 bool
579 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
580 			   enum spdk_bdev_io_status status)
581 {
582 	assert(raid_io->base_bdev_io_remaining >= completed);
583 	raid_io->base_bdev_io_remaining -= completed;
584 
585 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
586 		raid_io->base_bdev_io_status = status;
587 	}
588 
589 	if (raid_io->base_bdev_io_remaining == 0) {
590 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
591 		return true;
592 	} else {
593 		return false;
594 	}
595 }
596 
597 /*
598  * brief:
599  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
600  * It will try to queue the IOs after storing the context to bdev wait queue logic.
601  * params:
602  * raid_io - pointer to raid_bdev_io
603  * bdev - the block device that the IO is submitted to
604  * ch - io channel
605  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
606  * returns:
607  * none
608  */
609 void
610 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
611 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
612 {
613 	raid_io->waitq_entry.bdev = bdev;
614 	raid_io->waitq_entry.cb_fn = cb_fn;
615 	raid_io->waitq_entry.cb_arg = raid_io;
616 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
617 }
618 
619 static void
620 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
621 {
622 	struct raid_bdev_io *raid_io = cb_arg;
623 
624 	spdk_bdev_free_io(bdev_io);
625 
626 	raid_bdev_io_complete_part(raid_io, 1, success ?
627 				   SPDK_BDEV_IO_STATUS_SUCCESS :
628 				   SPDK_BDEV_IO_STATUS_FAILED);
629 }
630 
631 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
632 
633 static void
634 _raid_bdev_submit_reset_request(void *_raid_io)
635 {
636 	struct raid_bdev_io *raid_io = _raid_io;
637 
638 	raid_bdev_submit_reset_request(raid_io);
639 }
640 
641 /*
642  * brief:
643  * raid_bdev_submit_reset_request function submits reset requests
644  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
645  * which case it will queue it for later submission
646  * params:
647  * raid_io
648  * returns:
649  * none
650  */
651 static void
652 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
653 {
654 	struct raid_bdev		*raid_bdev;
655 	int				ret;
656 	uint8_t				i;
657 	struct raid_base_bdev_info	*base_info;
658 	struct spdk_io_channel		*base_ch;
659 
660 	raid_bdev = raid_io->raid_bdev;
661 
662 	if (raid_io->base_bdev_io_remaining == 0) {
663 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
664 	}
665 
666 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
667 		base_info = &raid_bdev->base_bdev_info[i];
668 		base_ch = raid_io->raid_ch->base_channel[i];
669 		if (base_ch == NULL) {
670 			raid_io->base_bdev_io_submitted++;
671 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
672 			continue;
673 		}
674 		ret = spdk_bdev_reset(base_info->desc, base_ch,
675 				      raid_base_bdev_reset_complete, raid_io);
676 		if (ret == 0) {
677 			raid_io->base_bdev_io_submitted++;
678 		} else if (ret == -ENOMEM) {
679 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
680 						base_ch, _raid_bdev_submit_reset_request);
681 			return;
682 		} else {
683 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
684 			assert(false);
685 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
686 			return;
687 		}
688 	}
689 }
690 
691 static void
692 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
693 {
694 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
695 	size_t iov_offset = (split_offset << raid_bdev->blocklen_shift);
696 	int i;
697 
698 	assert(split_offset != 0);
699 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
700 	raid_io->split.offset = split_offset;
701 
702 	raid_io->offset_blocks += split_offset;
703 	raid_io->num_blocks -= split_offset;
704 	if (raid_io->md_buf != NULL) {
705 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
706 	}
707 
708 	for (i = 0; i < raid_io->iovcnt; i++) {
709 		struct iovec *iov = &raid_io->iovs[i];
710 
711 		if (iov_offset < iov->iov_len) {
712 			if (iov_offset == 0) {
713 				raid_io->split.iov = NULL;
714 			} else {
715 				raid_io->split.iov = iov;
716 				raid_io->split.iov_copy = *iov;
717 				iov->iov_base += iov_offset;
718 				iov->iov_len -= iov_offset;
719 			}
720 			raid_io->iovs += i;
721 			raid_io->iovcnt -= i;
722 			break;
723 		}
724 
725 		iov_offset -= iov->iov_len;
726 	}
727 }
728 
729 static void
730 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
731 {
732 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
733 
734 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
735 		uint64_t offset_begin = raid_io->offset_blocks;
736 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
737 
738 		if (offset_end > raid_ch->process.offset) {
739 			if (offset_begin < raid_ch->process.offset) {
740 				/*
741 				 * If the I/O spans both the processed and unprocessed ranges,
742 				 * split it and first handle the unprocessed part. After it
743 				 * completes, the rest will be handled.
744 				 * This situation occurs when the process thread is not active
745 				 * or is waiting for the process window range to be locked
746 				 * (quiesced). When a window is being processed, such I/Os will be
747 				 * deferred by the bdev layer until the window is unlocked.
748 				 */
749 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
750 					      raid_ch->process.offset, offset_begin, offset_end);
751 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
752 			}
753 		} else {
754 			/* Use the child channel, which corresponds to the already processed range */
755 			raid_io->raid_ch = raid_ch->process.ch_processed;
756 		}
757 	}
758 
759 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
760 }
761 
762 /*
763  * brief:
764  * Callback function to spdk_bdev_io_get_buf.
765  * params:
766  * ch - pointer to raid bdev io channel
767  * bdev_io - pointer to parent bdev_io on raid bdev device
768  * success - True if buffer is allocated or false otherwise.
769  * returns:
770  * none
771  */
772 static void
773 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
774 		     bool success)
775 {
776 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
777 
778 	if (!success) {
779 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
780 		return;
781 	}
782 
783 	raid_bdev_submit_rw_request(raid_io);
784 }
785 
786 void
787 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
788 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
789 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
790 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
791 {
792 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
793 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
794 
795 	raid_io->type = type;
796 	raid_io->offset_blocks = offset_blocks;
797 	raid_io->num_blocks = num_blocks;
798 	raid_io->iovs = iovs;
799 	raid_io->iovcnt = iovcnt;
800 	raid_io->memory_domain = memory_domain;
801 	raid_io->memory_domain_ctx = memory_domain_ctx;
802 	raid_io->md_buf = md_buf;
803 
804 	raid_io->raid_bdev = raid_bdev;
805 	raid_io->raid_ch = raid_ch;
806 	raid_io->base_bdev_io_remaining = 0;
807 	raid_io->base_bdev_io_submitted = 0;
808 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
809 	raid_io->completion_cb = NULL;
810 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
811 }
812 
813 /*
814  * brief:
815  * raid_bdev_submit_request function is the submit_request function pointer of
816  * raid bdev function table. This is used to submit the io on raid_bdev to below
817  * layers.
818  * params:
819  * ch - pointer to raid bdev io channel
820  * bdev_io - pointer to parent bdev_io on raid bdev device
821  * returns:
822  * none
823  */
824 static void
825 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
826 {
827 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
828 
829 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
830 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
831 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
832 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
833 
834 	switch (bdev_io->type) {
835 	case SPDK_BDEV_IO_TYPE_READ:
836 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
837 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
838 		break;
839 	case SPDK_BDEV_IO_TYPE_WRITE:
840 		raid_bdev_submit_rw_request(raid_io);
841 		break;
842 
843 	case SPDK_BDEV_IO_TYPE_RESET:
844 		raid_bdev_submit_reset_request(raid_io);
845 		break;
846 
847 	case SPDK_BDEV_IO_TYPE_FLUSH:
848 	case SPDK_BDEV_IO_TYPE_UNMAP:
849 		if (raid_io->raid_bdev->process != NULL) {
850 			/* TODO: rebuild support */
851 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
852 			return;
853 		}
854 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
855 		break;
856 
857 	default:
858 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
859 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
860 		break;
861 	}
862 }
863 
864 /*
865  * brief:
866  * _raid_bdev_io_type_supported checks whether io_type is supported in
867  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
868  * doesn't support, the raid device doesn't supports.
869  *
870  * params:
871  * raid_bdev - pointer to raid bdev context
872  * io_type - io type
873  * returns:
874  * true - io_type is supported
875  * false - io_type is not supported
876  */
877 inline static bool
878 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
879 {
880 	struct raid_base_bdev_info *base_info;
881 
882 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
883 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
884 		if (raid_bdev->module->submit_null_payload_request == NULL) {
885 			return false;
886 		}
887 	}
888 
889 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
890 		if (base_info->desc == NULL) {
891 			continue;
892 		}
893 
894 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
895 			return false;
896 		}
897 	}
898 
899 	return true;
900 }
901 
902 /*
903  * brief:
904  * raid_bdev_io_type_supported is the io_supported function for bdev function
905  * table which returns whether the particular io type is supported or not by
906  * raid bdev module
907  * params:
908  * ctx - pointer to raid bdev context
909  * type - io type
910  * returns:
911  * true - io_type is supported
912  * false - io_type is not supported
913  */
914 static bool
915 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
916 {
917 	switch (io_type) {
918 	case SPDK_BDEV_IO_TYPE_READ:
919 	case SPDK_BDEV_IO_TYPE_WRITE:
920 		return true;
921 
922 	case SPDK_BDEV_IO_TYPE_FLUSH:
923 	case SPDK_BDEV_IO_TYPE_RESET:
924 	case SPDK_BDEV_IO_TYPE_UNMAP:
925 		return _raid_bdev_io_type_supported(ctx, io_type);
926 
927 	default:
928 		return false;
929 	}
930 
931 	return false;
932 }
933 
934 /*
935  * brief:
936  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
937  * raid bdev. This is used to return the io channel for this raid bdev
938  * params:
939  * ctxt - pointer to raid_bdev
940  * returns:
941  * pointer to io channel for raid bdev
942  */
943 static struct spdk_io_channel *
944 raid_bdev_get_io_channel(void *ctxt)
945 {
946 	struct raid_bdev *raid_bdev = ctxt;
947 
948 	return spdk_get_io_channel(raid_bdev);
949 }
950 
951 void
952 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
953 {
954 	struct raid_base_bdev_info *base_info;
955 	char uuid_str[SPDK_UUID_STRING_LEN];
956 
957 	assert(raid_bdev != NULL);
958 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
959 
960 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
961 	spdk_json_write_named_string(w, "uuid", uuid_str);
962 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
963 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
964 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
965 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
966 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
967 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
968 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
969 				     raid_bdev->num_base_bdevs_operational);
970 	if (raid_bdev->process) {
971 		struct raid_bdev_process *process = raid_bdev->process;
972 		uint64_t offset = process->window_offset;
973 
974 		spdk_json_write_named_object_begin(w, "process");
975 		spdk_json_write_name(w, "type");
976 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
977 		spdk_json_write_named_string(w, "target", process->target->name);
978 		spdk_json_write_named_object_begin(w, "progress");
979 		spdk_json_write_named_uint64(w, "blocks", offset);
980 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
981 		spdk_json_write_object_end(w);
982 		spdk_json_write_object_end(w);
983 	}
984 	spdk_json_write_name(w, "base_bdevs_list");
985 	spdk_json_write_array_begin(w);
986 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
987 		spdk_json_write_object_begin(w);
988 		spdk_json_write_name(w, "name");
989 		if (base_info->name) {
990 			spdk_json_write_string(w, base_info->name);
991 		} else {
992 			spdk_json_write_null(w);
993 		}
994 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
995 		spdk_json_write_named_string(w, "uuid", uuid_str);
996 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
997 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
998 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
999 		spdk_json_write_object_end(w);
1000 	}
1001 	spdk_json_write_array_end(w);
1002 }
1003 
1004 /*
1005  * brief:
1006  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1007  * params:
1008  * ctx - pointer to raid_bdev
1009  * w - pointer to json context
1010  * returns:
1011  * 0 - success
1012  * non zero - failure
1013  */
1014 static int
1015 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1016 {
1017 	struct raid_bdev *raid_bdev = ctx;
1018 
1019 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1020 
1021 	/* Dump the raid bdev configuration related information */
1022 	spdk_json_write_named_object_begin(w, "raid");
1023 	raid_bdev_write_info_json(raid_bdev, w);
1024 	spdk_json_write_object_end(w);
1025 
1026 	return 0;
1027 }
1028 
1029 /*
1030  * brief:
1031  * raid_bdev_write_config_json is the function table pointer for raid bdev
1032  * params:
1033  * bdev - pointer to spdk_bdev
1034  * w - pointer to json context
1035  * returns:
1036  * none
1037  */
1038 static void
1039 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1040 {
1041 	struct raid_bdev *raid_bdev = bdev->ctxt;
1042 	struct raid_base_bdev_info *base_info;
1043 	char uuid_str[SPDK_UUID_STRING_LEN];
1044 
1045 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1046 
1047 	if (raid_bdev->sb != NULL) {
1048 		/* raid bdev configuration is stored in the superblock */
1049 		return;
1050 	}
1051 
1052 	spdk_json_write_object_begin(w);
1053 
1054 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1055 
1056 	spdk_json_write_named_object_begin(w, "params");
1057 	spdk_json_write_named_string(w, "name", bdev->name);
1058 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
1059 	spdk_json_write_named_string(w, "uuid", uuid_str);
1060 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1061 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1062 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
1063 
1064 	spdk_json_write_named_array_begin(w, "base_bdevs");
1065 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1066 		if (base_info->desc) {
1067 			spdk_json_write_string(w, spdk_bdev_desc_get_bdev(base_info->desc)->name);
1068 		}
1069 	}
1070 	spdk_json_write_array_end(w);
1071 	spdk_json_write_object_end(w);
1072 
1073 	spdk_json_write_object_end(w);
1074 }
1075 
1076 static int
1077 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1078 {
1079 	struct raid_bdev *raid_bdev = ctx;
1080 	struct raid_base_bdev_info *base_info;
1081 	int domains_count = 0, rc = 0;
1082 
1083 	if (raid_bdev->module->memory_domains_supported == false) {
1084 		return 0;
1085 	}
1086 
1087 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1088 
1089 	/* First loop to get the number of memory domains */
1090 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1091 		if (base_info->desc == NULL) {
1092 			continue;
1093 		}
1094 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1095 		if (rc < 0) {
1096 			goto out;
1097 		}
1098 		domains_count += rc;
1099 	}
1100 
1101 	if (!domains || array_size < domains_count) {
1102 		goto out;
1103 	}
1104 
1105 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1106 		if (base_info->desc == NULL) {
1107 			continue;
1108 		}
1109 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1110 		if (rc < 0) {
1111 			goto out;
1112 		}
1113 		domains += rc;
1114 		array_size -= rc;
1115 	}
1116 out:
1117 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1118 
1119 	if (rc < 0) {
1120 		return rc;
1121 	}
1122 
1123 	return domains_count;
1124 }
1125 
1126 /* g_raid_bdev_fn_table is the function table for raid bdev */
1127 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1128 	.destruct		= raid_bdev_destruct,
1129 	.submit_request		= raid_bdev_submit_request,
1130 	.io_type_supported	= raid_bdev_io_type_supported,
1131 	.get_io_channel		= raid_bdev_get_io_channel,
1132 	.dump_info_json		= raid_bdev_dump_info_json,
1133 	.write_config_json	= raid_bdev_write_config_json,
1134 	.get_memory_domains	= raid_bdev_get_memory_domains,
1135 };
1136 
1137 struct raid_bdev *
1138 raid_bdev_find_by_name(const char *name)
1139 {
1140 	struct raid_bdev *raid_bdev;
1141 
1142 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1143 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1144 			return raid_bdev;
1145 		}
1146 	}
1147 
1148 	return NULL;
1149 }
1150 
1151 static struct {
1152 	const char *name;
1153 	enum raid_level value;
1154 } g_raid_level_names[] = {
1155 	{ "raid0", RAID0 },
1156 	{ "0", RAID0 },
1157 	{ "raid1", RAID1 },
1158 	{ "1", RAID1 },
1159 	{ "raid5f", RAID5F },
1160 	{ "5f", RAID5F },
1161 	{ "concat", CONCAT },
1162 	{ }
1163 };
1164 
1165 const char *g_raid_state_names[] = {
1166 	[RAID_BDEV_STATE_ONLINE]	= "online",
1167 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1168 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1169 	[RAID_BDEV_STATE_MAX]		= NULL
1170 };
1171 
1172 static const char *g_raid_process_type_names[] = {
1173 	[RAID_PROCESS_NONE]	= "none",
1174 	[RAID_PROCESS_REBUILD]	= "rebuild",
1175 	[RAID_PROCESS_MAX]	= NULL
1176 };
1177 
1178 /* We have to use the typedef in the function declaration to appease astyle. */
1179 typedef enum raid_level raid_level_t;
1180 typedef enum raid_bdev_state raid_bdev_state_t;
1181 
1182 raid_level_t
1183 raid_bdev_str_to_level(const char *str)
1184 {
1185 	unsigned int i;
1186 
1187 	assert(str != NULL);
1188 
1189 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1190 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1191 			return g_raid_level_names[i].value;
1192 		}
1193 	}
1194 
1195 	return INVALID_RAID_LEVEL;
1196 }
1197 
1198 const char *
1199 raid_bdev_level_to_str(enum raid_level level)
1200 {
1201 	unsigned int i;
1202 
1203 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1204 		if (g_raid_level_names[i].value == level) {
1205 			return g_raid_level_names[i].name;
1206 		}
1207 	}
1208 
1209 	return "";
1210 }
1211 
1212 raid_bdev_state_t
1213 raid_bdev_str_to_state(const char *str)
1214 {
1215 	unsigned int i;
1216 
1217 	assert(str != NULL);
1218 
1219 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1220 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1221 			break;
1222 		}
1223 	}
1224 
1225 	return i;
1226 }
1227 
1228 const char *
1229 raid_bdev_state_to_str(enum raid_bdev_state state)
1230 {
1231 	if (state >= RAID_BDEV_STATE_MAX) {
1232 		return "";
1233 	}
1234 
1235 	return g_raid_state_names[state];
1236 }
1237 
1238 const char *
1239 raid_bdev_process_to_str(enum raid_process_type value)
1240 {
1241 	if (value >= RAID_PROCESS_MAX) {
1242 		return "";
1243 	}
1244 
1245 	return g_raid_process_type_names[value];
1246 }
1247 
1248 /*
1249  * brief:
1250  * raid_bdev_fini_start is called when bdev layer is starting the
1251  * shutdown process
1252  * params:
1253  * none
1254  * returns:
1255  * none
1256  */
1257 static void
1258 raid_bdev_fini_start(void)
1259 {
1260 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1261 	g_shutdown_started = true;
1262 }
1263 
1264 /*
1265  * brief:
1266  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1267  * params:
1268  * none
1269  * returns:
1270  * none
1271  */
1272 static void
1273 raid_bdev_exit(void)
1274 {
1275 	struct raid_bdev *raid_bdev, *tmp;
1276 
1277 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1278 
1279 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1280 		raid_bdev_cleanup_and_free(raid_bdev);
1281 	}
1282 }
1283 
1284 static void
1285 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1286 {
1287 	spdk_json_write_object_begin(w);
1288 
1289 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1290 
1291 	spdk_json_write_named_object_begin(w, "params");
1292 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1293 	spdk_json_write_object_end(w);
1294 
1295 	spdk_json_write_object_end(w);
1296 }
1297 
1298 static int
1299 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1300 {
1301 	raid_bdev_opts_config_json(w);
1302 
1303 	return 0;
1304 }
1305 
1306 /*
1307  * brief:
1308  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1309  * module
1310  * params:
1311  * none
1312  * returns:
1313  * size of spdk_bdev_io context for raid
1314  */
1315 static int
1316 raid_bdev_get_ctx_size(void)
1317 {
1318 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1319 	return sizeof(struct raid_bdev_io);
1320 }
1321 
1322 static struct spdk_bdev_module g_raid_if = {
1323 	.name = "raid",
1324 	.module_init = raid_bdev_init,
1325 	.fini_start = raid_bdev_fini_start,
1326 	.module_fini = raid_bdev_exit,
1327 	.config_json = raid_bdev_config_json,
1328 	.get_ctx_size = raid_bdev_get_ctx_size,
1329 	.examine_disk = raid_bdev_examine,
1330 	.async_init = false,
1331 	.async_fini = false,
1332 };
1333 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1334 
1335 /*
1336  * brief:
1337  * raid_bdev_init is the initialization function for raid bdev module
1338  * params:
1339  * none
1340  * returns:
1341  * 0 - success
1342  * non zero - failure
1343  */
1344 static int
1345 raid_bdev_init(void)
1346 {
1347 	return 0;
1348 }
1349 
1350 static int
1351 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1352 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1353 		  struct raid_bdev **raid_bdev_out)
1354 {
1355 	struct raid_bdev *raid_bdev;
1356 	struct spdk_bdev *raid_bdev_gen;
1357 	struct raid_bdev_module *module;
1358 	struct raid_base_bdev_info *base_info;
1359 	uint8_t min_operational;
1360 
1361 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1362 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1363 		return -EINVAL;
1364 	}
1365 
1366 	if (raid_bdev_find_by_name(name) != NULL) {
1367 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1368 		return -EEXIST;
1369 	}
1370 
1371 	if (level == RAID1) {
1372 		if (strip_size != 0) {
1373 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1374 			return -EINVAL;
1375 		}
1376 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1377 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1378 		return -EINVAL;
1379 	}
1380 
1381 	module = raid_bdev_module_find(level);
1382 	if (module == NULL) {
1383 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1384 		return -EINVAL;
1385 	}
1386 
1387 	assert(module->base_bdevs_min != 0);
1388 	if (num_base_bdevs < module->base_bdevs_min) {
1389 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1390 			    module->base_bdevs_min,
1391 			    raid_bdev_level_to_str(level));
1392 		return -EINVAL;
1393 	}
1394 
1395 	switch (module->base_bdevs_constraint.type) {
1396 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1397 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1398 		break;
1399 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1400 		min_operational = module->base_bdevs_constraint.value;
1401 		break;
1402 	case CONSTRAINT_UNSET:
1403 		if (module->base_bdevs_constraint.value != 0) {
1404 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1405 				    (uint8_t)module->base_bdevs_constraint.value, name);
1406 			return -EINVAL;
1407 		}
1408 		min_operational = num_base_bdevs;
1409 		break;
1410 	default:
1411 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1412 			    (uint8_t)module->base_bdevs_constraint.type,
1413 			    raid_bdev_level_to_str(module->level));
1414 		return -EINVAL;
1415 	};
1416 
1417 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1418 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1419 			    raid_bdev_level_to_str(module->level));
1420 		return -EINVAL;
1421 	}
1422 
1423 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1424 	if (!raid_bdev) {
1425 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1426 		return -ENOMEM;
1427 	}
1428 
1429 	spdk_spin_init(&raid_bdev->base_bdev_lock);
1430 	raid_bdev->module = module;
1431 	raid_bdev->num_base_bdevs = num_base_bdevs;
1432 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1433 					   sizeof(struct raid_base_bdev_info));
1434 	if (!raid_bdev->base_bdev_info) {
1435 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1436 		raid_bdev_free(raid_bdev);
1437 		return -ENOMEM;
1438 	}
1439 
1440 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1441 		base_info->raid_bdev = raid_bdev;
1442 	}
1443 
1444 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1445 	 * internally and set later.
1446 	 */
1447 	raid_bdev->strip_size = 0;
1448 	raid_bdev->strip_size_kb = strip_size;
1449 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1450 	raid_bdev->level = level;
1451 	raid_bdev->min_base_bdevs_operational = min_operational;
1452 
1453 	if (superblock_enabled) {
1454 		raid_bdev->sb = spdk_dma_zmalloc(RAID_BDEV_SB_MAX_LENGTH, 0x1000, NULL);
1455 		if (!raid_bdev->sb) {
1456 			SPDK_ERRLOG("Failed to allocate raid bdev sb buffer\n");
1457 			raid_bdev_free(raid_bdev);
1458 			return -ENOMEM;
1459 		}
1460 	}
1461 
1462 	raid_bdev_gen = &raid_bdev->bdev;
1463 
1464 	raid_bdev_gen->name = strdup(name);
1465 	if (!raid_bdev_gen->name) {
1466 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1467 		raid_bdev_free(raid_bdev);
1468 		return -ENOMEM;
1469 	}
1470 
1471 	raid_bdev_gen->product_name = "Raid Volume";
1472 	raid_bdev_gen->ctxt = raid_bdev;
1473 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1474 	raid_bdev_gen->module = &g_raid_if;
1475 	raid_bdev_gen->write_cache = 0;
1476 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1477 
1478 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1479 
1480 	*raid_bdev_out = raid_bdev;
1481 
1482 	return 0;
1483 }
1484 
1485 /*
1486  * brief:
1487  * raid_bdev_create allocates raid bdev based on passed configuration
1488  * params:
1489  * name - name for raid bdev
1490  * strip_size - strip size in KB
1491  * num_base_bdevs - number of base bdevs
1492  * level - raid level
1493  * superblock_enabled - true if raid should have superblock
1494  * uuid - uuid to set for the bdev
1495  * raid_bdev_out - the created raid bdev
1496  * returns:
1497  * 0 - success
1498  * non zero - failure
1499  */
1500 int
1501 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1502 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1503 		 struct raid_bdev **raid_bdev_out)
1504 {
1505 	struct raid_bdev *raid_bdev;
1506 	int rc;
1507 
1508 	assert(uuid != NULL);
1509 
1510 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1511 			       &raid_bdev);
1512 	if (rc != 0) {
1513 		return rc;
1514 	}
1515 
1516 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1517 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1518 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1519 	}
1520 
1521 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1522 
1523 	*raid_bdev_out = raid_bdev;
1524 
1525 	return 0;
1526 }
1527 
1528 static void
1529 _raid_bdev_unregistering_cont(void *ctx)
1530 {
1531 	struct raid_bdev *raid_bdev = ctx;
1532 
1533 	spdk_bdev_close(raid_bdev->self_desc);
1534 	raid_bdev->self_desc = NULL;
1535 }
1536 
1537 static void
1538 raid_bdev_unregistering_cont(void *ctx)
1539 {
1540 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1541 }
1542 
1543 static int
1544 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1545 {
1546 	struct raid_process_finish_action *finish_action;
1547 
1548 	assert(spdk_get_thread() == process->thread);
1549 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1550 
1551 	finish_action = calloc(1, sizeof(*finish_action));
1552 	if (finish_action == NULL) {
1553 		return -ENOMEM;
1554 	}
1555 
1556 	finish_action->cb = cb;
1557 	finish_action->cb_ctx = cb_ctx;
1558 
1559 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1560 
1561 	return 0;
1562 }
1563 
1564 static void
1565 raid_bdev_unregistering_stop_process(void *ctx)
1566 {
1567 	struct raid_bdev_process *process = ctx;
1568 	struct raid_bdev *raid_bdev = process->raid_bdev;
1569 	int rc;
1570 
1571 	process->state = RAID_PROCESS_STATE_STOPPING;
1572 	if (process->status == 0) {
1573 		process->status = -ECANCELED;
1574 	}
1575 
1576 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1577 	if (rc != 0) {
1578 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1579 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1580 	}
1581 }
1582 
1583 static void
1584 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1585 {
1586 	struct raid_bdev *raid_bdev = event_ctx;
1587 
1588 	switch (type) {
1589 	case SPDK_BDEV_EVENT_REMOVE:
1590 		if (raid_bdev->process != NULL) {
1591 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1592 					     raid_bdev->process);
1593 		} else {
1594 			raid_bdev_unregistering_cont(raid_bdev);
1595 		}
1596 		break;
1597 	default:
1598 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1599 		break;
1600 	}
1601 }
1602 
1603 static void
1604 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1605 {
1606 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1607 	int rc;
1608 
1609 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1610 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1611 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1612 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1613 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1614 				sizeof(struct raid_bdev_io_channel),
1615 				raid_bdev_gen->name);
1616 	rc = spdk_bdev_register(raid_bdev_gen);
1617 	if (rc != 0) {
1618 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1619 			    raid_bdev_gen->name, spdk_strerror(-rc));
1620 		goto err;
1621 	}
1622 
1623 	/*
1624 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1625 	 * first. The process may still need to unquiesce a range but it will fail because the
1626 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1627 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1628 	 * so this is the only way currently to do this correctly.
1629 	 * TODO: try to handle this correctly in bdev layer instead.
1630 	 */
1631 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1632 				&raid_bdev->self_desc);
1633 	if (rc != 0) {
1634 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1635 			    raid_bdev_gen->name, spdk_strerror(-rc));
1636 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1637 		goto err;
1638 	}
1639 
1640 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1641 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1642 		      raid_bdev_gen->name, raid_bdev);
1643 	return;
1644 err:
1645 	if (raid_bdev->module->stop != NULL) {
1646 		raid_bdev->module->stop(raid_bdev);
1647 	}
1648 	spdk_io_device_unregister(raid_bdev, NULL);
1649 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1650 }
1651 
1652 static void
1653 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1654 {
1655 	if (status == 0) {
1656 		raid_bdev_configure_cont(raid_bdev);
1657 	} else {
1658 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1659 			    raid_bdev->bdev.name, spdk_strerror(-status));
1660 		if (raid_bdev->module->stop != NULL) {
1661 			raid_bdev->module->stop(raid_bdev);
1662 		}
1663 	}
1664 }
1665 
1666 /*
1667  * brief:
1668  * If raid bdev config is complete, then only register the raid bdev to
1669  * bdev layer and remove this raid bdev from configuring list and
1670  * insert the raid bdev to configured list
1671  * params:
1672  * raid_bdev - pointer to raid bdev
1673  * returns:
1674  * 0 - success
1675  * non zero - failure
1676  */
1677 static int
1678 raid_bdev_configure(struct raid_bdev *raid_bdev)
1679 {
1680 	int rc;
1681 
1682 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1683 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1684 	assert(raid_bdev->bdev.blocklen > 0);
1685 
1686 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1687 	 * internal use.
1688 	 */
1689 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / raid_bdev->bdev.blocklen;
1690 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1691 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1692 		return -EINVAL;
1693 	}
1694 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1695 	raid_bdev->blocklen_shift = spdk_u32log2(raid_bdev->bdev.blocklen);
1696 
1697 	rc = raid_bdev->module->start(raid_bdev);
1698 	if (rc != 0) {
1699 		SPDK_ERRLOG("raid module startup callback failed\n");
1700 		return rc;
1701 	}
1702 
1703 	if (raid_bdev->sb != NULL) {
1704 		if (spdk_uuid_is_null(&raid_bdev->sb->uuid)) {
1705 			/* NULL UUID is not valid in the sb so it means that we are creating a new
1706 			 * raid bdev and should initialize the superblock.
1707 			 */
1708 			raid_bdev_init_superblock(raid_bdev);
1709 		} else {
1710 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1711 			if (raid_bdev->sb->block_size != raid_bdev->bdev.blocklen) {
1712 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1713 				rc = -EINVAL;
1714 			}
1715 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1716 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1717 				rc = -EINVAL;
1718 			}
1719 			if (rc != 0) {
1720 				if (raid_bdev->module->stop != NULL) {
1721 					raid_bdev->module->stop(raid_bdev);
1722 				}
1723 				return rc;
1724 			}
1725 		}
1726 
1727 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1728 	} else {
1729 		raid_bdev_configure_cont(raid_bdev);
1730 	}
1731 
1732 	return 0;
1733 }
1734 
1735 /*
1736  * brief:
1737  * If raid bdev is online and registered, change the bdev state to
1738  * configuring and unregister this raid device. Queue this raid device
1739  * in configuring list
1740  * params:
1741  * raid_bdev - pointer to raid bdev
1742  * cb_fn - callback function
1743  * cb_arg - argument to callback function
1744  * returns:
1745  * none
1746  */
1747 static void
1748 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1749 		      void *cb_arg)
1750 {
1751 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1752 		if (cb_fn) {
1753 			cb_fn(cb_arg, 0);
1754 		}
1755 		return;
1756 	}
1757 
1758 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1759 	assert(raid_bdev->num_base_bdevs_discovered);
1760 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1761 
1762 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1763 }
1764 
1765 /*
1766  * brief:
1767  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1768  * params:
1769  * base_bdev - pointer to base bdev
1770  * returns:
1771  * base bdev info if found, otherwise NULL.
1772  */
1773 static struct raid_base_bdev_info *
1774 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1775 {
1776 	struct raid_bdev *raid_bdev;
1777 	struct raid_base_bdev_info *base_info;
1778 
1779 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1780 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1781 			if (base_info->desc != NULL &&
1782 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1783 				return base_info;
1784 			}
1785 		}
1786 	}
1787 
1788 	return NULL;
1789 }
1790 
1791 static void
1792 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1793 {
1794 	assert(base_info->remove_scheduled);
1795 
1796 	base_info->remove_scheduled = false;
1797 	if (base_info->remove_cb != NULL) {
1798 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1799 	}
1800 }
1801 
1802 static void
1803 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1804 {
1805 	struct raid_base_bdev_info *base_info = ctx;
1806 
1807 	if (status != 0) {
1808 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1809 			    raid_bdev->bdev.name, spdk_strerror(-status));
1810 	}
1811 
1812 	raid_bdev_remove_base_bdev_done(base_info, status);
1813 }
1814 
1815 static void
1816 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1817 {
1818 	struct raid_base_bdev_info *base_info = ctx;
1819 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1820 
1821 	if (status != 0) {
1822 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1823 			    raid_bdev->bdev.name, spdk_strerror(-status));
1824 		goto out;
1825 	}
1826 
1827 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1828 	raid_bdev_free_base_bdev_resource(base_info);
1829 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1830 
1831 	if (raid_bdev->sb) {
1832 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1833 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1834 		uint8_t i;
1835 
1836 		for (i = 0; i < sb->base_bdevs_size; i++) {
1837 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1838 
1839 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1840 			    sb_base_bdev->slot == slot) {
1841 				/* TODO: distinguish between failure and intentional removal */
1842 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1843 
1844 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1845 				return;
1846 			}
1847 		}
1848 	}
1849 out:
1850 	raid_bdev_remove_base_bdev_done(base_info, status);
1851 }
1852 
1853 static void
1854 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1855 {
1856 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1857 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1858 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1859 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1860 
1861 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1862 
1863 	if (raid_ch->base_channel[idx] != NULL) {
1864 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1865 		raid_ch->base_channel[idx] = NULL;
1866 	}
1867 
1868 	if (raid_ch->process.ch_processed != NULL) {
1869 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1870 	}
1871 
1872 	spdk_for_each_channel_continue(i, 0);
1873 }
1874 
1875 static void
1876 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1877 {
1878 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1879 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1880 
1881 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1882 			    base_info);
1883 }
1884 
1885 static void
1886 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1887 {
1888 	struct raid_base_bdev_info *base_info = ctx;
1889 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1890 
1891 	if (status != 0) {
1892 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1893 			    raid_bdev->bdev.name, spdk_strerror(-status));
1894 		raid_bdev_remove_base_bdev_done(base_info, status);
1895 		return;
1896 	}
1897 
1898 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1899 			      raid_bdev_channels_remove_base_bdev_done);
1900 }
1901 
1902 static int
1903 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
1904 {
1905 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1906 
1907 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
1908 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
1909 }
1910 
1911 struct raid_bdev_process_base_bdev_remove_ctx {
1912 	struct raid_bdev_process *process;
1913 	struct raid_base_bdev_info *base_info;
1914 	uint8_t num_base_bdevs_operational;
1915 };
1916 
1917 static void
1918 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
1919 {
1920 	struct raid_base_bdev_info *base_info = ctx;
1921 	int ret;
1922 
1923 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
1924 	if (ret != 0) {
1925 		raid_bdev_remove_base_bdev_done(base_info, ret);
1926 	}
1927 }
1928 
1929 static void
1930 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
1931 {
1932 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
1933 	struct raid_base_bdev_info *base_info = ctx->base_info;
1934 
1935 	free(ctx);
1936 
1937 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
1938 			     base_info);
1939 }
1940 
1941 static void
1942 _raid_bdev_process_base_bdev_remove(void *_ctx)
1943 {
1944 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
1945 	struct raid_bdev_process *process = ctx->process;
1946 	int ret;
1947 
1948 	if (ctx->base_info != process->target &&
1949 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
1950 		/* process doesn't need to be stopped */
1951 		raid_bdev_process_base_bdev_remove_cont(ctx);
1952 		return;
1953 	}
1954 
1955 	assert(process->state > RAID_PROCESS_STATE_INIT &&
1956 	       process->state < RAID_PROCESS_STATE_STOPPED);
1957 
1958 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
1959 	if (ret != 0) {
1960 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
1961 		free(ctx);
1962 		return;
1963 	}
1964 
1965 	process->state = RAID_PROCESS_STATE_STOPPING;
1966 
1967 	if (process->status == 0) {
1968 		process->status = -ENODEV;
1969 	}
1970 }
1971 
1972 static int
1973 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
1974 				   struct raid_base_bdev_info *base_info)
1975 {
1976 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
1977 
1978 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1979 
1980 	ctx = calloc(1, sizeof(*ctx));
1981 	if (ctx == NULL) {
1982 		return -ENOMEM;
1983 	}
1984 
1985 	/*
1986 	 * We have to send the process and num_base_bdevs_operational in the message ctx
1987 	 * because the process thread should not access raid_bdev's properties. Particularly,
1988 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
1989 	 * will still be valid until the process is fully stopped.
1990 	 */
1991 	ctx->base_info = base_info;
1992 	ctx->process = process;
1993 	ctx->num_base_bdevs_operational = process->raid_bdev->num_base_bdevs_operational;
1994 
1995 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
1996 
1997 	return 0;
1998 }
1999 
2000 static int
2001 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2002 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2003 {
2004 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2005 	int ret = 0;
2006 
2007 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2008 
2009 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2010 
2011 	if (base_info->remove_scheduled) {
2012 		return -ENODEV;
2013 	}
2014 
2015 	assert(base_info->desc);
2016 	base_info->remove_scheduled = true;
2017 	base_info->remove_cb = cb_fn;
2018 	base_info->remove_cb_ctx = cb_ctx;
2019 
2020 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2021 		/*
2022 		 * As raid bdev is not registered yet or already unregistered,
2023 		 * so cleanup should be done here itself.
2024 		 *
2025 		 * Removing a base bdev at this stage does not change the number of operational
2026 		 * base bdevs, only the number of discovered base bdevs.
2027 		 */
2028 		raid_bdev_free_base_bdev_resource(base_info);
2029 		if (raid_bdev->num_base_bdevs_discovered == 0) {
2030 			/* There is no base bdev for this raid, so free the raid device. */
2031 			raid_bdev_cleanup_and_free(raid_bdev);
2032 		}
2033 	} else if (raid_bdev->num_base_bdevs_operational-- == raid_bdev->min_base_bdevs_operational) {
2034 		/*
2035 		 * After this base bdev is removed there will not be enough base bdevs
2036 		 * to keep the raid bdev operational.
2037 		 */
2038 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2039 	} else if (raid_bdev->process != NULL) {
2040 		ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2041 	} else {
2042 		ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2043 	}
2044 
2045 	if (ret != 0) {
2046 		base_info->remove_scheduled = false;
2047 	}
2048 	return ret;
2049 }
2050 
2051 /*
2052  * brief:
2053  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2054  * is removed. This function checks if this base bdev is part of any raid bdev
2055  * or not. If yes, it takes necessary action on that particular raid bdev.
2056  * params:
2057  * base_bdev - pointer to base bdev which got removed
2058  * cb_fn - callback function
2059  * cb_arg - argument to callback function
2060  * returns:
2061  * 0 - success
2062  * non zero - failure
2063  */
2064 int
2065 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2066 {
2067 	struct raid_base_bdev_info *base_info;
2068 
2069 	/* Find the raid_bdev which has claimed this base_bdev */
2070 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2071 	if (!base_info) {
2072 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2073 		return -ENODEV;
2074 	}
2075 
2076 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2077 }
2078 
2079 /*
2080  * brief:
2081  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2082  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2083  * If yes, call module handler to resize the raid_bdev if implemented.
2084  * params:
2085  * base_bdev - pointer to base bdev which got resized.
2086  * returns:
2087  * none
2088  */
2089 static void
2090 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2091 {
2092 	struct raid_bdev *raid_bdev;
2093 	struct raid_base_bdev_info *base_info;
2094 
2095 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2096 
2097 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2098 
2099 	/* Find the raid_bdev which has claimed this base_bdev */
2100 	if (!base_info) {
2101 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2102 		return;
2103 	}
2104 	raid_bdev = base_info->raid_bdev;
2105 
2106 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2107 
2108 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2109 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2110 
2111 	if (raid_bdev->module->resize) {
2112 		raid_bdev->module->resize(raid_bdev);
2113 	}
2114 }
2115 
2116 /*
2117  * brief:
2118  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2119  * triggers asynchronous event.
2120  * params:
2121  * type - event details.
2122  * bdev - bdev that triggered event.
2123  * event_ctx - context for event.
2124  * returns:
2125  * none
2126  */
2127 static void
2128 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2129 			  void *event_ctx)
2130 {
2131 	int rc;
2132 
2133 	switch (type) {
2134 	case SPDK_BDEV_EVENT_REMOVE:
2135 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2136 		if (rc != 0) {
2137 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2138 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2139 		}
2140 		break;
2141 	case SPDK_BDEV_EVENT_RESIZE:
2142 		raid_bdev_resize_base_bdev(bdev);
2143 		break;
2144 	default:
2145 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2146 		break;
2147 	}
2148 }
2149 
2150 /*
2151  * brief:
2152  * Deletes the specified raid bdev
2153  * params:
2154  * raid_bdev - pointer to raid bdev
2155  * cb_fn - callback function
2156  * cb_arg - argument to callback function
2157  */
2158 void
2159 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2160 {
2161 	struct raid_base_bdev_info *base_info;
2162 
2163 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2164 
2165 	if (raid_bdev->destroy_started) {
2166 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2167 			      raid_bdev->bdev.name);
2168 		if (cb_fn) {
2169 			cb_fn(cb_arg, -EALREADY);
2170 		}
2171 		return;
2172 	}
2173 
2174 	raid_bdev->destroy_started = true;
2175 
2176 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2177 		base_info->remove_scheduled = true;
2178 
2179 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2180 			/*
2181 			 * As raid bdev is not registered yet or already unregistered,
2182 			 * so cleanup should be done here itself.
2183 			 */
2184 			raid_bdev_free_base_bdev_resource(base_info);
2185 		}
2186 	}
2187 
2188 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2189 		/* There is no base bdev for this raid, so free the raid device. */
2190 		raid_bdev_cleanup_and_free(raid_bdev);
2191 		if (cb_fn) {
2192 			cb_fn(cb_arg, 0);
2193 		}
2194 	} else {
2195 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2196 	}
2197 }
2198 
2199 static void
2200 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2201 {
2202 	if (status != 0) {
2203 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2204 			    raid_bdev->bdev.name, spdk_strerror(-status));
2205 	}
2206 }
2207 
2208 static void
2209 raid_bdev_process_finish_write_sb(void *ctx)
2210 {
2211 	struct raid_bdev *raid_bdev = ctx;
2212 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2213 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2214 	struct raid_base_bdev_info *base_info;
2215 	uint8_t i;
2216 
2217 	for (i = 0; i < sb->base_bdevs_size; i++) {
2218 		sb_base_bdev = &sb->base_bdevs[i];
2219 
2220 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2221 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2222 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2223 			if (base_info->is_configured) {
2224 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2225 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2226 			}
2227 		}
2228 	}
2229 
2230 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2231 }
2232 
2233 static void raid_bdev_process_free(struct raid_bdev_process *process);
2234 
2235 static void
2236 _raid_bdev_process_finish_done(void *ctx)
2237 {
2238 	struct raid_bdev_process *process = ctx;
2239 	struct raid_process_finish_action *finish_action;
2240 
2241 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2242 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2243 		finish_action->cb(finish_action->cb_ctx);
2244 		free(finish_action);
2245 	}
2246 
2247 	raid_bdev_process_free(process);
2248 
2249 	spdk_thread_exit(spdk_get_thread());
2250 }
2251 
2252 static void
2253 raid_bdev_process_finish_target_removed(void *ctx, int status)
2254 {
2255 	struct raid_bdev_process *process = ctx;
2256 
2257 	if (status != 0) {
2258 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2259 	}
2260 
2261 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2262 }
2263 
2264 static void
2265 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2266 {
2267 	struct raid_bdev_process *process = ctx;
2268 
2269 	if (status != 0) {
2270 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2271 	}
2272 
2273 	if (process->status != 0) {
2274 		struct raid_base_bdev_info *target = process->target;
2275 
2276 		if (target->desc != NULL && target->remove_scheduled == false) {
2277 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2278 			return;
2279 		}
2280 	}
2281 
2282 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2283 }
2284 
2285 static void
2286 raid_bdev_process_finish_unquiesce(void *ctx)
2287 {
2288 	struct raid_bdev_process *process = ctx;
2289 	int rc;
2290 
2291 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2292 				 raid_bdev_process_finish_unquiesced, process);
2293 	if (rc != 0) {
2294 		raid_bdev_process_finish_unquiesced(process, rc);
2295 	}
2296 }
2297 
2298 static void
2299 raid_bdev_process_finish_done(void *ctx)
2300 {
2301 	struct raid_bdev_process *process = ctx;
2302 	struct raid_bdev *raid_bdev = process->raid_bdev;
2303 
2304 	if (process->raid_ch != NULL) {
2305 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2306 	}
2307 
2308 	process->state = RAID_PROCESS_STATE_STOPPED;
2309 
2310 	if (process->status == 0) {
2311 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2312 			       raid_bdev_process_to_str(process->type),
2313 			       raid_bdev->bdev.name);
2314 		if (raid_bdev->sb != NULL) {
2315 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2316 					     raid_bdev_process_finish_write_sb,
2317 					     raid_bdev);
2318 		}
2319 	} else {
2320 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2321 			     raid_bdev_process_to_str(process->type),
2322 			     raid_bdev->bdev.name,
2323 			     spdk_strerror(-process->status));
2324 	}
2325 
2326 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2327 			     process);
2328 }
2329 
2330 static void
2331 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2332 {
2333 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2334 
2335 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2336 }
2337 
2338 static void
2339 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2340 {
2341 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2342 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2343 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2344 
2345 	if (process->status == 0) {
2346 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2347 
2348 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2349 		raid_ch->process.target_ch = NULL;
2350 	}
2351 
2352 	raid_bdev_ch_process_cleanup(raid_ch);
2353 
2354 	spdk_for_each_channel_continue(i, 0);
2355 }
2356 
2357 static void
2358 raid_bdev_process_finish_quiesced(void *ctx, int status)
2359 {
2360 	struct raid_bdev_process *process = ctx;
2361 	struct raid_bdev *raid_bdev = process->raid_bdev;
2362 
2363 	if (status != 0) {
2364 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2365 		return;
2366 	}
2367 
2368 	raid_bdev->process = NULL;
2369 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2370 			      __raid_bdev_process_finish);
2371 }
2372 
2373 static void
2374 _raid_bdev_process_finish(void *ctx)
2375 {
2376 	struct raid_bdev_process *process = ctx;
2377 	int rc;
2378 
2379 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2380 			       raid_bdev_process_finish_quiesced, process);
2381 	if (rc != 0) {
2382 		raid_bdev_process_finish_quiesced(ctx, rc);
2383 	}
2384 }
2385 
2386 static void
2387 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2388 {
2389 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2390 }
2391 
2392 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2393 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2394 
2395 static void
2396 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2397 {
2398 	assert(spdk_get_thread() == process->thread);
2399 
2400 	if (process->status == 0) {
2401 		process->status = status;
2402 	}
2403 
2404 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2405 		return;
2406 	}
2407 
2408 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2409 	process->state = RAID_PROCESS_STATE_STOPPING;
2410 
2411 	if (process->window_range_locked) {
2412 		raid_bdev_process_unlock_window_range(process);
2413 	} else {
2414 		raid_bdev_process_thread_run(process);
2415 	}
2416 }
2417 
2418 static void
2419 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2420 {
2421 	struct raid_bdev_process *process = ctx;
2422 
2423 	if (status != 0) {
2424 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2425 		raid_bdev_process_finish(process, status);
2426 		return;
2427 	}
2428 
2429 	process->window_range_locked = false;
2430 	process->window_offset += process->window_size;
2431 
2432 	raid_bdev_process_thread_run(process);
2433 }
2434 
2435 static void
2436 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2437 {
2438 	int rc;
2439 
2440 	assert(process->window_range_locked == true);
2441 
2442 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2443 				       process->window_offset, process->max_window_size,
2444 				       raid_bdev_process_window_range_unlocked, process);
2445 	if (rc != 0) {
2446 		raid_bdev_process_window_range_unlocked(process, rc);
2447 	}
2448 }
2449 
2450 static void
2451 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2452 {
2453 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2454 
2455 	raid_bdev_process_unlock_window_range(process);
2456 }
2457 
2458 static void
2459 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2460 {
2461 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2462 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2463 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2464 
2465 	raid_ch->process.offset = process->window_offset + process->window_size;
2466 
2467 	spdk_for_each_channel_continue(i, 0);
2468 }
2469 
2470 void
2471 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2472 {
2473 	struct raid_bdev_process *process = process_req->process;
2474 
2475 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2476 
2477 	assert(spdk_get_thread() == process->thread);
2478 	assert(process->window_remaining >= process_req->num_blocks);
2479 
2480 	if (status != 0) {
2481 		process->window_status = status;
2482 	}
2483 
2484 	process->window_remaining -= process_req->num_blocks;
2485 	if (process->window_remaining == 0) {
2486 		if (process->window_status != 0) {
2487 			raid_bdev_process_finish(process, process->window_status);
2488 			return;
2489 		}
2490 
2491 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2492 				      raid_bdev_process_channels_update_done);
2493 	}
2494 }
2495 
2496 static int
2497 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2498 				 uint32_t num_blocks)
2499 {
2500 	struct raid_bdev *raid_bdev = process->raid_bdev;
2501 	struct raid_bdev_process_request *process_req;
2502 	int ret;
2503 
2504 	process_req = TAILQ_FIRST(&process->requests);
2505 	if (process_req == NULL) {
2506 		assert(process->window_remaining > 0);
2507 		return 0;
2508 	}
2509 
2510 	process_req->target = process->target;
2511 	process_req->target_ch = process->raid_ch->process.target_ch;
2512 	process_req->offset_blocks = offset_blocks;
2513 	process_req->num_blocks = num_blocks;
2514 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2515 
2516 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2517 	if (ret <= 0) {
2518 		if (ret < 0) {
2519 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2520 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2521 			process->window_status = ret;
2522 		}
2523 		return ret;
2524 	}
2525 
2526 	process_req->num_blocks = ret;
2527 	TAILQ_REMOVE(&process->requests, process_req, link);
2528 
2529 	return ret;
2530 }
2531 
2532 static void
2533 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2534 {
2535 	struct raid_bdev *raid_bdev = process->raid_bdev;
2536 	uint64_t offset = process->window_offset;
2537 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2538 	int ret;
2539 
2540 	while (offset < offset_end) {
2541 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2542 		if (ret <= 0) {
2543 			break;
2544 		}
2545 
2546 		process->window_remaining += ret;
2547 		offset += ret;
2548 	}
2549 
2550 	if (process->window_remaining > 0) {
2551 		process->window_size = process->window_remaining;
2552 	} else {
2553 		raid_bdev_process_finish(process, process->window_status);
2554 	}
2555 }
2556 
2557 static void
2558 raid_bdev_process_window_range_locked(void *ctx, int status)
2559 {
2560 	struct raid_bdev_process *process = ctx;
2561 
2562 	if (status != 0) {
2563 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2564 		raid_bdev_process_finish(process, status);
2565 		return;
2566 	}
2567 
2568 	process->window_range_locked = true;
2569 
2570 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2571 		raid_bdev_process_unlock_window_range(process);
2572 		return;
2573 	}
2574 
2575 	_raid_bdev_process_thread_run(process);
2576 }
2577 
2578 static void
2579 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2580 {
2581 	struct raid_bdev *raid_bdev = process->raid_bdev;
2582 	int rc;
2583 
2584 	assert(spdk_get_thread() == process->thread);
2585 	assert(process->window_remaining == 0);
2586 	assert(process->window_range_locked == false);
2587 
2588 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2589 		raid_bdev_process_do_finish(process);
2590 		return;
2591 	}
2592 
2593 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2594 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2595 		raid_bdev_process_finish(process, 0);
2596 		return;
2597 	}
2598 
2599 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2600 					    process->max_window_size);
2601 
2602 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2603 				     process->window_offset, process->max_window_size,
2604 				     raid_bdev_process_window_range_locked, process);
2605 	if (rc != 0) {
2606 		raid_bdev_process_window_range_locked(process, rc);
2607 	}
2608 }
2609 
2610 static void
2611 raid_bdev_process_thread_init(void *ctx)
2612 {
2613 	struct raid_bdev_process *process = ctx;
2614 	struct raid_bdev *raid_bdev = process->raid_bdev;
2615 	struct spdk_io_channel *ch;
2616 
2617 	process->thread = spdk_get_thread();
2618 
2619 	ch = spdk_get_io_channel(raid_bdev);
2620 	if (ch == NULL) {
2621 		process->status = -ENOMEM;
2622 		raid_bdev_process_do_finish(process);
2623 		return;
2624 	}
2625 
2626 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2627 	process->state = RAID_PROCESS_STATE_RUNNING;
2628 
2629 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2630 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2631 
2632 	raid_bdev_process_thread_run(process);
2633 }
2634 
2635 static void
2636 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2637 {
2638 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2639 
2640 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2641 	raid_bdev_process_free(process);
2642 
2643 	/* TODO: update sb */
2644 }
2645 
2646 static void
2647 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2648 {
2649 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2650 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2651 
2652 	raid_bdev_ch_process_cleanup(raid_ch);
2653 
2654 	spdk_for_each_channel_continue(i, 0);
2655 }
2656 
2657 static void
2658 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2659 {
2660 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2661 	struct raid_bdev *raid_bdev = process->raid_bdev;
2662 	struct spdk_thread *thread;
2663 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2664 
2665 	if (status != 0) {
2666 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2667 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2668 			    spdk_strerror(-status));
2669 		goto err;
2670 	}
2671 
2672 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2673 
2674 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2675 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2676 
2677 	thread = spdk_thread_create(thread_name, NULL);
2678 	if (thread == NULL) {
2679 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2680 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2681 		goto err;
2682 	}
2683 
2684 	raid_bdev->process = process;
2685 
2686 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2687 
2688 	return;
2689 err:
2690 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2691 			      raid_bdev_channels_abort_start_process_done);
2692 }
2693 
2694 static void
2695 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2696 {
2697 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2698 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2699 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2700 	int rc;
2701 
2702 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2703 
2704 	spdk_for_each_channel_continue(i, rc);
2705 }
2706 
2707 static void
2708 raid_bdev_process_start(struct raid_bdev_process *process)
2709 {
2710 	struct raid_bdev *raid_bdev = process->raid_bdev;
2711 
2712 	assert(raid_bdev->module->submit_process_request != NULL);
2713 
2714 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2715 			      raid_bdev_channels_start_process_done);
2716 }
2717 
2718 static void
2719 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2720 {
2721 	spdk_dma_free(process_req->iov.iov_base);
2722 	spdk_dma_free(process_req->md_buf);
2723 	free(process_req);
2724 }
2725 
2726 static struct raid_bdev_process_request *
2727 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2728 {
2729 	struct raid_bdev *raid_bdev = process->raid_bdev;
2730 	struct raid_bdev_process_request *process_req;
2731 
2732 	process_req = calloc(1, sizeof(*process_req));
2733 	if (process_req == NULL) {
2734 		return NULL;
2735 	}
2736 
2737 	process_req->process = process;
2738 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2739 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2740 	if (process_req->iov.iov_base == NULL) {
2741 		free(process_req);
2742 		return NULL;
2743 	}
2744 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2745 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2746 		if (process_req->md_buf == NULL) {
2747 			raid_bdev_process_request_free(process_req);
2748 			return NULL;
2749 		}
2750 	}
2751 
2752 	return process_req;
2753 }
2754 
2755 static void
2756 raid_bdev_process_free(struct raid_bdev_process *process)
2757 {
2758 	struct raid_bdev_process_request *process_req;
2759 
2760 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2761 		TAILQ_REMOVE(&process->requests, process_req, link);
2762 		raid_bdev_process_request_free(process_req);
2763 	}
2764 
2765 	free(process);
2766 }
2767 
2768 static struct raid_bdev_process *
2769 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2770 			struct raid_base_bdev_info *target)
2771 {
2772 	struct raid_bdev_process *process;
2773 	struct raid_bdev_process_request *process_req;
2774 	int i;
2775 
2776 	process = calloc(1, sizeof(*process));
2777 	if (process == NULL) {
2778 		return NULL;
2779 	}
2780 
2781 	process->raid_bdev = raid_bdev;
2782 	process->type = type;
2783 	process->target = target;
2784 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2785 					    raid_bdev->bdev.blocklen),
2786 					    raid_bdev->bdev.write_unit_size);
2787 	TAILQ_INIT(&process->requests);
2788 	TAILQ_INIT(&process->finish_actions);
2789 
2790 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2791 		process_req = raid_bdev_process_alloc_request(process);
2792 		if (process_req == NULL) {
2793 			raid_bdev_process_free(process);
2794 			return NULL;
2795 		}
2796 
2797 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2798 	}
2799 
2800 	return process;
2801 }
2802 
2803 static int
2804 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2805 {
2806 	struct raid_bdev_process *process;
2807 
2808 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2809 
2810 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2811 	if (process == NULL) {
2812 		return -ENOMEM;
2813 	}
2814 
2815 	raid_bdev_process_start(process);
2816 
2817 	return 0;
2818 }
2819 
2820 static void
2821 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2822 {
2823 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2824 	int rc;
2825 
2826 	/* TODO: defer if rebuild in progress on another base bdev */
2827 	assert(raid_bdev->process == NULL);
2828 
2829 	base_info->is_configured = true;
2830 
2831 	raid_bdev->num_base_bdevs_discovered++;
2832 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2833 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2834 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2835 
2836 	/*
2837 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
2838 	 * of base bdevs we know to be operational members of the array. Usually this is equal
2839 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
2840 	 * degraded.
2841 	 */
2842 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
2843 		rc = raid_bdev_configure(raid_bdev);
2844 		if (rc != 0) {
2845 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
2846 		}
2847 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
2848 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
2849 		raid_bdev->num_base_bdevs_operational++;
2850 		rc = raid_bdev_start_rebuild(base_info);
2851 		if (rc != 0) {
2852 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
2853 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
2854 		}
2855 	} else {
2856 		rc = 0;
2857 	}
2858 
2859 	if (base_info->configure_cb != NULL) {
2860 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
2861 	}
2862 }
2863 
2864 static void
2865 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
2866 		void *ctx)
2867 {
2868 	struct raid_base_bdev_info *base_info = ctx;
2869 
2870 	switch (status) {
2871 	case 0:
2872 		/* valid superblock found */
2873 		SPDK_ERRLOG("Existing raid superblock found on bdev %s\n", base_info->name);
2874 		status = -EEXIST;
2875 		raid_bdev_free_base_bdev_resource(base_info);
2876 		break;
2877 	case -EINVAL:
2878 		/* no valid superblock */
2879 		raid_bdev_configure_base_bdev_cont(base_info);
2880 		return;
2881 	default:
2882 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
2883 			    base_info->name, spdk_strerror(-status));
2884 		break;
2885 	}
2886 
2887 	if (base_info->configure_cb != NULL) {
2888 		base_info->configure_cb(base_info->configure_cb_ctx, status);
2889 	}
2890 }
2891 
2892 static int
2893 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
2894 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
2895 {
2896 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2897 	struct spdk_bdev_desc *desc;
2898 	struct spdk_bdev *bdev;
2899 	const struct spdk_uuid *bdev_uuid;
2900 	int rc;
2901 
2902 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2903 	assert(base_info->desc == NULL);
2904 
2905 	/*
2906 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
2907 	 * before claiming the bdev.
2908 	 */
2909 
2910 	if (!spdk_uuid_is_null(&base_info->uuid)) {
2911 		char uuid_str[SPDK_UUID_STRING_LEN];
2912 		const char *bdev_name;
2913 
2914 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
2915 
2916 		/* UUID of a bdev is registered as its alias */
2917 		bdev = spdk_bdev_get_by_name(uuid_str);
2918 		if (bdev == NULL) {
2919 			return -ENODEV;
2920 		}
2921 
2922 		bdev_name = spdk_bdev_get_name(bdev);
2923 
2924 		if (base_info->name == NULL) {
2925 			assert(existing == true);
2926 			base_info->name = strdup(bdev_name);
2927 			if (base_info->name == NULL) {
2928 				return -ENOMEM;
2929 			}
2930 		} else if (strcmp(base_info->name, bdev_name) != 0) {
2931 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
2932 				    bdev_name, base_info->name);
2933 			return -EINVAL;
2934 		}
2935 	}
2936 
2937 	assert(base_info->name != NULL);
2938 
2939 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
2940 	if (rc != 0) {
2941 		if (rc != -ENODEV) {
2942 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
2943 		}
2944 		return rc;
2945 	}
2946 
2947 	bdev = spdk_bdev_desc_get_bdev(desc);
2948 	bdev_uuid = spdk_bdev_get_uuid(bdev);
2949 
2950 	if (spdk_uuid_is_null(&base_info->uuid)) {
2951 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
2952 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
2953 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
2954 		spdk_bdev_close(desc);
2955 		return -EINVAL;
2956 	}
2957 
2958 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
2959 	if (rc != 0) {
2960 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
2961 		spdk_bdev_close(desc);
2962 		return rc;
2963 	}
2964 
2965 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
2966 
2967 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
2968 	if (base_info->app_thread_ch == NULL) {
2969 		SPDK_ERRLOG("Failed to get io channel\n");
2970 		spdk_bdev_module_release_bdev(bdev);
2971 		spdk_bdev_close(desc);
2972 		return -ENOMEM;
2973 	}
2974 
2975 	base_info->desc = desc;
2976 	base_info->blockcnt = bdev->blockcnt;
2977 
2978 	if (raid_bdev->sb != NULL) {
2979 		uint64_t data_offset;
2980 
2981 		if (base_info->data_offset == 0) {
2982 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % bdev->blocklen) == 0);
2983 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / bdev->blocklen;
2984 		} else {
2985 			data_offset = base_info->data_offset;
2986 		}
2987 
2988 		if (bdev->optimal_io_boundary != 0) {
2989 			data_offset = spdk_divide_round_up(data_offset,
2990 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
2991 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
2992 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
2993 					     base_info->data_offset, base_info->name, data_offset);
2994 				data_offset = base_info->data_offset;
2995 			}
2996 		}
2997 
2998 		base_info->data_offset = data_offset;
2999 	}
3000 
3001 	if (base_info->data_offset >= bdev->blockcnt) {
3002 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3003 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3004 		rc = -EINVAL;
3005 		goto out;
3006 	}
3007 
3008 	if (base_info->data_size == 0) {
3009 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3010 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3011 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3012 			    bdev->blockcnt, base_info->name);
3013 		rc = -EINVAL;
3014 		goto out;
3015 	}
3016 
3017 	/* Currently, RAID bdevs do not support DIF or DIX, so a RAID bdev cannot
3018 	 * be created on top of any bdev which supports it */
3019 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3020 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3021 			    bdev->name);
3022 		rc = -EINVAL;
3023 		goto out;
3024 	}
3025 
3026 	/*
3027 	 * Set the raid bdev properties if this is the first base bdev configured,
3028 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3029 	 * have the same blocklen and metadata format.
3030 	 */
3031 	if (raid_bdev->num_base_bdevs_discovered == 0) {
3032 		raid_bdev->bdev.blocklen = bdev->blocklen;
3033 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3034 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3035 	} else {
3036 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3037 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3038 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3039 			rc = -EINVAL;
3040 			goto out;
3041 		}
3042 
3043 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3044 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev)) {
3045 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3046 				    raid_bdev->bdev.name, bdev->name);
3047 			rc = -EINVAL;
3048 			goto out;
3049 		}
3050 	}
3051 
3052 	base_info->configure_cb = cb_fn;
3053 	base_info->configure_cb_ctx = cb_ctx;
3054 
3055 	if (existing) {
3056 		raid_bdev_configure_base_bdev_cont(base_info);
3057 	} else {
3058 		/* check for existing superblock when using a new bdev */
3059 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3060 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3061 		if (rc) {
3062 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3063 				    bdev->name, spdk_strerror(-rc));
3064 		}
3065 	}
3066 out:
3067 	if (rc != 0) {
3068 		raid_bdev_free_base_bdev_resource(base_info);
3069 	}
3070 	return rc;
3071 }
3072 
3073 static int
3074 _raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3075 			   uint64_t data_offset, uint64_t data_size,
3076 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3077 {
3078 	struct raid_base_bdev_info *base_info;
3079 
3080 	assert(name != NULL);
3081 
3082 	if (slot >= raid_bdev->num_base_bdevs) {
3083 		return -EINVAL;
3084 	}
3085 
3086 	base_info = &raid_bdev->base_bdev_info[slot];
3087 
3088 	if (base_info->name != NULL) {
3089 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev '%s'\n",
3090 			    slot, raid_bdev->bdev.name, base_info->name);
3091 		return -EBUSY;
3092 	}
3093 
3094 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3095 		char uuid_str[SPDK_UUID_STRING_LEN];
3096 
3097 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3098 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev with uuid %s\n",
3099 			    slot, raid_bdev->bdev.name, uuid_str);
3100 		return -EBUSY;
3101 	}
3102 
3103 	base_info->name = strdup(name);
3104 	if (base_info->name == NULL) {
3105 		return -ENOMEM;
3106 	}
3107 
3108 	base_info->data_offset = data_offset;
3109 	base_info->data_size = data_size;
3110 
3111 	return raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3112 }
3113 
3114 int
3115 raid_bdev_attach_base_bdev(struct raid_bdev *raid_bdev, struct spdk_bdev *base_bdev,
3116 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3117 {
3118 	struct raid_base_bdev_info *base_info = NULL, *iter;
3119 	int rc;
3120 
3121 	SPDK_DEBUGLOG(bdev_raid, "attach_base_device: %s\n", base_bdev->name);
3122 
3123 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3124 
3125 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
3126 		SPDK_ERRLOG("raid bdev '%s' must be in online state to attach base bdev\n",
3127 			    raid_bdev->bdev.name);
3128 		return -EINVAL;
3129 	}
3130 
3131 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3132 		if (iter->desc == NULL) {
3133 			base_info = iter;
3134 			break;
3135 		}
3136 	}
3137 
3138 	if (base_info == NULL) {
3139 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3140 			    raid_bdev->bdev.name, base_bdev->name);
3141 		return -EINVAL;
3142 	}
3143 
3144 	assert(base_info->is_configured == false);
3145 	assert(base_info->data_size != 0);
3146 
3147 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
3148 
3149 	rc = _raid_bdev_add_base_device(raid_bdev, base_bdev->name,
3150 					raid_bdev_base_bdev_slot(base_info),
3151 					base_info->data_offset, base_info->data_size,
3152 					cb_fn, cb_ctx);
3153 	if (rc != 0) {
3154 		SPDK_ERRLOG("base bdev '%s' attach failed: %s\n", base_bdev->name, spdk_strerror(-rc));
3155 		raid_bdev_free_base_bdev_resource(base_info);
3156 	}
3157 
3158 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
3159 
3160 	return rc;
3161 }
3162 
3163 /*
3164  * brief:
3165  * raid_bdev_add_base_device function is the actual function which either adds
3166  * the nvme base device to existing raid bdev or create a new raid bdev. It also claims
3167  * the base device and keep the open descriptor.
3168  * params:
3169  * raid_bdev - pointer to raid bdev
3170  * name - name of the base bdev
3171  * slot - position to add base bdev
3172  * cb_fn - callback function
3173  * cb_ctx - argument to callback function
3174  * returns:
3175  * 0 - success
3176  * non zero - failure
3177  */
3178 int
3179 raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3180 			  raid_base_bdev_cb cb_fn, void *cb_ctx)
3181 {
3182 	return _raid_bdev_add_base_device(raid_bdev, name, slot, 0, 0, cb_fn, cb_ctx);
3183 }
3184 
3185 static int
3186 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3187 {
3188 	struct raid_bdev *raid_bdev;
3189 	uint8_t i;
3190 	int rc;
3191 
3192 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3193 			       sb->level, true, &sb->uuid, &raid_bdev);
3194 	if (rc != 0) {
3195 		return rc;
3196 	}
3197 
3198 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3199 	memcpy(raid_bdev->sb, sb, sb->length);
3200 
3201 	for (i = 0; i < sb->base_bdevs_size; i++) {
3202 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3203 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3204 
3205 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3206 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3207 			raid_bdev->num_base_bdevs_operational++;
3208 		}
3209 
3210 		base_info->data_offset = sb_base_bdev->data_offset;
3211 		base_info->data_size = sb_base_bdev->data_size;
3212 	}
3213 
3214 	*raid_bdev_out = raid_bdev;
3215 	return 0;
3216 }
3217 
3218 static void
3219 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3220 {
3221 	struct raid_bdev *raid_bdev;
3222 	struct raid_base_bdev_info *base_info;
3223 
3224 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3225 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3226 			if (base_info->desc == NULL && base_info->name != NULL &&
3227 			    strcmp(bdev->name, base_info->name) == 0) {
3228 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3229 				break;
3230 			}
3231 		}
3232 	}
3233 }
3234 
3235 static void
3236 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev)
3237 {
3238 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3239 	struct raid_bdev *raid_bdev;
3240 	struct raid_base_bdev_info *iter, *base_info;
3241 	uint8_t i;
3242 	int rc;
3243 
3244 	if (sb->block_size != bdev->blocklen) {
3245 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3246 			     bdev->name, sb->block_size, bdev->blocklen);
3247 		return;
3248 	}
3249 
3250 	if (spdk_uuid_is_null(&sb->uuid)) {
3251 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3252 		return;
3253 	}
3254 
3255 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3256 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3257 			break;
3258 		}
3259 	}
3260 
3261 	if (raid_bdev) {
3262 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3263 			SPDK_DEBUGLOG(bdev_raid,
3264 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3265 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3266 
3267 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3268 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3269 					     raid_bdev->bdev.name, bdev->name);
3270 				return;
3271 			}
3272 
3273 			/* remove and then recreate the raid bdev using the newer superblock */
3274 			raid_bdev_delete(raid_bdev, NULL, NULL);
3275 			raid_bdev = NULL;
3276 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3277 			SPDK_DEBUGLOG(bdev_raid,
3278 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3279 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3280 			/* use the current raid bdev superblock */
3281 			sb = raid_bdev->sb;
3282 		}
3283 	}
3284 
3285 	for (i = 0; i < sb->base_bdevs_size; i++) {
3286 		sb_base_bdev = &sb->base_bdevs[i];
3287 
3288 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3289 
3290 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3291 			break;
3292 		}
3293 	}
3294 
3295 	if (i == sb->base_bdevs_size) {
3296 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3297 		return;
3298 	}
3299 
3300 	if (!raid_bdev) {
3301 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3302 		if (rc != 0) {
3303 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3304 				    sb->name, spdk_strerror(-rc));
3305 			return;
3306 		}
3307 	}
3308 
3309 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3310 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3311 			       bdev->name, raid_bdev->bdev.name);
3312 		return;
3313 	}
3314 
3315 	base_info = NULL;
3316 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3317 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3318 			base_info = iter;
3319 			break;
3320 		}
3321 	}
3322 
3323 	if (base_info == NULL) {
3324 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3325 			    bdev->name, raid_bdev->bdev.name);
3326 		return;
3327 	}
3328 
3329 	rc = raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3330 	if (rc != 0) {
3331 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3332 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3333 	}
3334 }
3335 
3336 struct raid_bdev_examine_ctx {
3337 	struct spdk_bdev_desc *desc;
3338 	struct spdk_io_channel *ch;
3339 };
3340 
3341 static void
3342 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3343 {
3344 	if (!ctx) {
3345 		return;
3346 	}
3347 
3348 	if (ctx->ch) {
3349 		spdk_put_io_channel(ctx->ch);
3350 	}
3351 
3352 	if (ctx->desc) {
3353 		spdk_bdev_close(ctx->desc);
3354 	}
3355 
3356 	free(ctx);
3357 }
3358 
3359 static void
3360 raid_bdev_examine_load_sb_cb(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3361 {
3362 	struct raid_bdev_examine_ctx *ctx = _ctx;
3363 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3364 
3365 	switch (status) {
3366 	case 0:
3367 		/* valid superblock found */
3368 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3369 		raid_bdev_examine_sb(sb, bdev);
3370 		break;
3371 	case -EINVAL:
3372 		/* no valid superblock, check if it can be claimed anyway */
3373 		raid_bdev_examine_no_sb(bdev);
3374 		break;
3375 	default:
3376 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3377 			    bdev->name, spdk_strerror(-status));
3378 		break;
3379 	}
3380 
3381 	raid_bdev_examine_ctx_free(ctx);
3382 	spdk_bdev_module_examine_done(&g_raid_if);
3383 }
3384 
3385 static void
3386 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3387 {
3388 }
3389 
3390 /*
3391  * brief:
3392  * raid_bdev_examine function is the examine function call by the below layers
3393  * like bdev_nvme layer. This function will check if this base bdev can be
3394  * claimed by this raid bdev or not.
3395  * params:
3396  * bdev - pointer to base bdev
3397  * returns:
3398  * none
3399  */
3400 static void
3401 raid_bdev_examine(struct spdk_bdev *bdev)
3402 {
3403 	struct raid_bdev_examine_ctx *ctx;
3404 	int rc;
3405 
3406 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3407 		raid_bdev_examine_no_sb(bdev);
3408 		spdk_bdev_module_examine_done(&g_raid_if);
3409 		return;
3410 	}
3411 
3412 	ctx = calloc(1, sizeof(*ctx));
3413 	if (!ctx) {
3414 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3415 			    bdev->name, spdk_strerror(ENOMEM));
3416 		goto err;
3417 	}
3418 
3419 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, raid_bdev_examine_event_cb, NULL,
3420 				&ctx->desc);
3421 	if (rc) {
3422 		SPDK_ERRLOG("Failed to open bdev %s: %s\n",
3423 			    bdev->name, spdk_strerror(-rc));
3424 		goto err;
3425 	}
3426 
3427 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3428 	if (!ctx->ch) {
3429 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev->name);
3430 		goto err;
3431 	}
3432 
3433 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_cb, ctx);
3434 	if (rc) {
3435 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3436 			    bdev->name, spdk_strerror(-rc));
3437 		goto err;
3438 	}
3439 
3440 	return;
3441 err:
3442 	raid_bdev_examine_ctx_free(ctx);
3443 	spdk_bdev_module_examine_done(&g_raid_if);
3444 }
3445 
3446 /* Log component for bdev raid bdev module */
3447 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3448