xref: /spdk/module/bdev/raid/bdev_raid.c (revision fa3ab73844ced08f4f9487f5de71d477ca5cf604)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 #include "spdk/trace.h"
16 #include "spdk_internal/trace_defs.h"
17 
18 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
19 #define RAID_BDEV_PROCESS_MAX_QD	16
20 
21 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT	1024
22 #define RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT	0
23 
24 static bool g_shutdown_started = false;
25 
26 /* List of all raid bdevs */
27 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
28 
29 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
30 
31 /*
32  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
33  * contains the relationship of raid bdev io channel with base bdev io channels.
34  */
35 struct raid_bdev_io_channel {
36 	/* Array of IO channels of base bdevs */
37 	struct spdk_io_channel	**base_channel;
38 
39 	/* Private raid module IO channel */
40 	struct spdk_io_channel	*module_channel;
41 
42 	/* Background process data */
43 	struct {
44 		uint64_t offset;
45 		struct spdk_io_channel *target_ch;
46 		struct raid_bdev_io_channel *ch_processed;
47 	} process;
48 };
49 
50 enum raid_bdev_process_state {
51 	RAID_PROCESS_STATE_INIT,
52 	RAID_PROCESS_STATE_RUNNING,
53 	RAID_PROCESS_STATE_STOPPING,
54 	RAID_PROCESS_STATE_STOPPED,
55 };
56 
57 struct raid_process_qos {
58 	bool enable_qos;
59 	uint64_t last_tsc;
60 	double bytes_per_tsc;
61 	double bytes_available;
62 	double bytes_max;
63 	struct spdk_poller *process_continue_poller;
64 };
65 
66 struct raid_bdev_process {
67 	struct raid_bdev		*raid_bdev;
68 	enum raid_process_type		type;
69 	enum raid_bdev_process_state	state;
70 	struct spdk_thread		*thread;
71 	struct raid_bdev_io_channel	*raid_ch;
72 	TAILQ_HEAD(, raid_bdev_process_request) requests;
73 	uint64_t			max_window_size;
74 	uint64_t			window_size;
75 	uint64_t			window_remaining;
76 	int				window_status;
77 	uint64_t			window_offset;
78 	bool				window_range_locked;
79 	struct raid_base_bdev_info	*target;
80 	int				status;
81 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
82 	struct raid_process_qos		qos;
83 };
84 
85 struct raid_process_finish_action {
86 	spdk_msg_fn cb;
87 	void *cb_ctx;
88 	TAILQ_ENTRY(raid_process_finish_action) link;
89 };
90 
91 static struct spdk_raid_bdev_opts g_opts = {
92 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
93 	.process_max_bandwidth_mb_sec = RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT,
94 };
95 
96 void
97 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
98 {
99 	*opts = g_opts;
100 }
101 
102 int
103 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
104 {
105 	if (opts->process_window_size_kb == 0) {
106 		return -EINVAL;
107 	}
108 
109 	g_opts = *opts;
110 
111 	return 0;
112 }
113 
114 static struct raid_bdev_module *
115 raid_bdev_module_find(enum raid_level level)
116 {
117 	struct raid_bdev_module *raid_module;
118 
119 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
120 		if (raid_module->level == level) {
121 			return raid_module;
122 		}
123 	}
124 
125 	return NULL;
126 }
127 
128 void
129 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
130 {
131 	if (raid_bdev_module_find(raid_module->level) != NULL) {
132 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
133 			    raid_bdev_level_to_str(raid_module->level));
134 		assert(false);
135 	} else {
136 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
137 	}
138 }
139 
140 struct spdk_io_channel *
141 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
142 {
143 	return raid_ch->base_channel[idx];
144 }
145 
146 void *
147 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
148 {
149 	assert(raid_ch->module_channel != NULL);
150 
151 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
152 }
153 
154 struct raid_base_bdev_info *
155 raid_bdev_channel_get_base_info(struct raid_bdev_io_channel *raid_ch, struct spdk_bdev *base_bdev)
156 {
157 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
158 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
159 	uint8_t i;
160 
161 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
162 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[i];
163 
164 		if (base_info->is_configured &&
165 		    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
166 			return base_info;
167 		}
168 	}
169 
170 	return NULL;
171 }
172 
173 /* Function declarations */
174 static void	raid_bdev_examine(struct spdk_bdev *bdev);
175 static int	raid_bdev_init(void);
176 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
177 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
178 
179 static void
180 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
181 {
182 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
183 
184 	if (raid_ch->process.target_ch != NULL) {
185 		spdk_put_io_channel(raid_ch->process.target_ch);
186 		raid_ch->process.target_ch = NULL;
187 	}
188 
189 	if (raid_ch->process.ch_processed != NULL) {
190 		free(raid_ch->process.ch_processed->base_channel);
191 		free(raid_ch->process.ch_processed);
192 		raid_ch->process.ch_processed = NULL;
193 	}
194 }
195 
196 static int
197 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
198 {
199 	struct raid_bdev *raid_bdev = process->raid_bdev;
200 	struct raid_bdev_io_channel *raid_ch_processed;
201 	struct raid_base_bdev_info *base_info;
202 
203 	raid_ch->process.offset = process->window_offset;
204 
205 	/* In the future we may have other types of processes which don't use a target bdev,
206 	 * like data scrubbing or strip size migration. Until then, expect that there always is
207 	 * a process target. */
208 	assert(process->target != NULL);
209 
210 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
211 	if (raid_ch->process.target_ch == NULL) {
212 		goto err;
213 	}
214 
215 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
216 	if (raid_ch_processed == NULL) {
217 		goto err;
218 	}
219 	raid_ch->process.ch_processed = raid_ch_processed;
220 
221 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
222 					  sizeof(*raid_ch_processed->base_channel));
223 	if (raid_ch_processed->base_channel == NULL) {
224 		goto err;
225 	}
226 
227 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
228 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
229 
230 		if (base_info != process->target) {
231 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
232 		} else {
233 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
234 		}
235 	}
236 
237 	raid_ch_processed->module_channel = raid_ch->module_channel;
238 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
239 
240 	return 0;
241 err:
242 	raid_bdev_ch_process_cleanup(raid_ch);
243 	return -ENOMEM;
244 }
245 
246 /*
247  * brief:
248  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
249  * hierarchy from raid bdev to base bdev io channels. It will be called per core
250  * params:
251  * io_device - pointer to raid bdev io device represented by raid_bdev
252  * ctx_buf - pointer to context buffer for raid bdev io channel
253  * returns:
254  * 0 - success
255  * non zero - failure
256  */
257 static int
258 raid_bdev_create_cb(void *io_device, void *ctx_buf)
259 {
260 	struct raid_bdev            *raid_bdev = io_device;
261 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
262 	uint8_t i;
263 	int ret = -ENOMEM;
264 
265 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
266 
267 	assert(raid_bdev != NULL);
268 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
269 
270 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
271 	if (!raid_ch->base_channel) {
272 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
273 		return -ENOMEM;
274 	}
275 
276 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
277 		/*
278 		 * Get the spdk_io_channel for all the base bdevs. This is used during
279 		 * split logic to send the respective child bdev ios to respective base
280 		 * bdev io channel.
281 		 * Skip missing base bdevs and the process target, which should also be treated as
282 		 * missing until the process completes.
283 		 */
284 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
285 		    raid_bdev->base_bdev_info[i].is_process_target == true) {
286 			continue;
287 		}
288 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
289 						   raid_bdev->base_bdev_info[i].desc);
290 		if (!raid_ch->base_channel[i]) {
291 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
292 			goto err;
293 		}
294 	}
295 
296 	if (raid_bdev->module->get_io_channel) {
297 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
298 		if (!raid_ch->module_channel) {
299 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
300 			goto err;
301 		}
302 	}
303 
304 	if (raid_bdev->process != NULL) {
305 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
306 		if (ret != 0) {
307 			SPDK_ERRLOG("Failed to setup process io channel\n");
308 			goto err;
309 		}
310 	} else {
311 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
312 	}
313 
314 	return 0;
315 err:
316 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
317 		if (raid_ch->base_channel[i] != NULL) {
318 			spdk_put_io_channel(raid_ch->base_channel[i]);
319 		}
320 	}
321 	free(raid_ch->base_channel);
322 
323 	raid_bdev_ch_process_cleanup(raid_ch);
324 
325 	return ret;
326 }
327 
328 /*
329  * brief:
330  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
331  * hierarchy from raid bdev to base bdev io channels. It will be called per core
332  * params:
333  * io_device - pointer to raid bdev io device represented by raid_bdev
334  * ctx_buf - pointer to context buffer for raid bdev io channel
335  * returns:
336  * none
337  */
338 static void
339 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
340 {
341 	struct raid_bdev *raid_bdev = io_device;
342 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
343 	uint8_t i;
344 
345 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
346 
347 	assert(raid_ch != NULL);
348 	assert(raid_ch->base_channel);
349 
350 	if (raid_ch->module_channel) {
351 		spdk_put_io_channel(raid_ch->module_channel);
352 	}
353 
354 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
355 		/* Free base bdev channels */
356 		if (raid_ch->base_channel[i] != NULL) {
357 			spdk_put_io_channel(raid_ch->base_channel[i]);
358 		}
359 	}
360 	free(raid_ch->base_channel);
361 	raid_ch->base_channel = NULL;
362 
363 	raid_bdev_ch_process_cleanup(raid_ch);
364 }
365 
366 /*
367  * brief:
368  * raid_bdev_cleanup is used to cleanup raid_bdev related data
369  * structures.
370  * params:
371  * raid_bdev - pointer to raid_bdev
372  * returns:
373  * none
374  */
375 static void
376 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
377 {
378 	struct raid_base_bdev_info *base_info;
379 
380 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
381 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
382 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
383 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
384 
385 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
386 		assert(base_info->desc == NULL);
387 		free(base_info->name);
388 	}
389 
390 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
391 }
392 
393 static void
394 raid_bdev_free(struct raid_bdev *raid_bdev)
395 {
396 	raid_bdev_free_superblock(raid_bdev);
397 	free(raid_bdev->base_bdev_info);
398 	free(raid_bdev->bdev.name);
399 	free(raid_bdev);
400 }
401 
402 static void
403 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
404 {
405 	raid_bdev_cleanup(raid_bdev);
406 	raid_bdev_free(raid_bdev);
407 }
408 
409 static void
410 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
411 {
412 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
413 
414 	assert(base_info->is_configured);
415 	assert(raid_bdev->num_base_bdevs_discovered);
416 	raid_bdev->num_base_bdevs_discovered--;
417 	base_info->is_configured = false;
418 	base_info->is_process_target = false;
419 }
420 
421 /*
422  * brief:
423  * free resource of base bdev for raid bdev
424  * params:
425  * base_info - raid base bdev info
426  * returns:
427  * none
428  */
429 static void
430 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
431 {
432 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
433 
434 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
435 	assert(base_info->configure_cb == NULL);
436 
437 	free(base_info->name);
438 	base_info->name = NULL;
439 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
440 		spdk_uuid_set_null(&base_info->uuid);
441 	}
442 	base_info->is_failed = false;
443 
444 	/* clear `data_offset` to allow it to be recalculated during configuration */
445 	base_info->data_offset = 0;
446 
447 	if (base_info->desc == NULL) {
448 		return;
449 	}
450 
451 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
452 	spdk_bdev_close(base_info->desc);
453 	base_info->desc = NULL;
454 	spdk_put_io_channel(base_info->app_thread_ch);
455 	base_info->app_thread_ch = NULL;
456 
457 	if (base_info->is_configured) {
458 		raid_bdev_deconfigure_base_bdev(base_info);
459 	}
460 }
461 
462 static void
463 raid_bdev_io_device_unregister_cb(void *io_device)
464 {
465 	struct raid_bdev *raid_bdev = io_device;
466 
467 	if (raid_bdev->num_base_bdevs_discovered == 0) {
468 		/* Free raid_bdev when there are no base bdevs left */
469 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
470 		raid_bdev_cleanup(raid_bdev);
471 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
472 		raid_bdev_free(raid_bdev);
473 	} else {
474 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
475 	}
476 }
477 
478 void
479 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
480 {
481 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
482 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
483 	}
484 }
485 
486 static void
487 _raid_bdev_destruct(void *ctxt)
488 {
489 	struct raid_bdev *raid_bdev = ctxt;
490 	struct raid_base_bdev_info *base_info;
491 
492 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
493 
494 	assert(raid_bdev->process == NULL);
495 
496 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
497 		/*
498 		 * Close all base bdev descriptors for which call has come from below
499 		 * layers.  Also close the descriptors if we have started shutdown.
500 		 */
501 		if (g_shutdown_started || base_info->remove_scheduled == true) {
502 			raid_bdev_free_base_bdev_resource(base_info);
503 		}
504 	}
505 
506 	if (g_shutdown_started) {
507 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
508 	}
509 
510 	if (raid_bdev->module->stop != NULL) {
511 		if (raid_bdev->module->stop(raid_bdev) == false) {
512 			return;
513 		}
514 	}
515 
516 	raid_bdev_module_stop_done(raid_bdev);
517 }
518 
519 static int
520 raid_bdev_destruct(void *ctx)
521 {
522 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
523 
524 	return 1;
525 }
526 
527 int
528 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
529 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
530 {
531 	struct spdk_dif_ctx dif_ctx;
532 	struct spdk_dif_error err_blk = {};
533 	int rc;
534 	struct spdk_dif_ctx_init_ext_opts dif_opts;
535 	struct iovec md_iov = {
536 		.iov_base	= md_buf,
537 		.iov_len	= num_blocks * bdev->md_len,
538 	};
539 
540 	if (md_buf == NULL) {
541 		return 0;
542 	}
543 
544 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
545 	dif_opts.dif_pi_format = bdev->dif_pi_format;
546 	rc = spdk_dif_ctx_init(&dif_ctx,
547 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
548 			       bdev->dif_is_head_of_md, bdev->dif_type,
549 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
550 			       0, 0, 0, 0, 0, &dif_opts);
551 	if (rc != 0) {
552 		SPDK_ERRLOG("Initialization of DIF context failed\n");
553 		return rc;
554 	}
555 
556 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
557 
558 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
559 	if (rc != 0) {
560 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
561 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
562 	}
563 
564 	return rc;
565 }
566 
567 int
568 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
569 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
570 {
571 	struct spdk_dif_ctx dif_ctx;
572 	struct spdk_dif_error err_blk = {};
573 	int rc;
574 	struct spdk_dif_ctx_init_ext_opts dif_opts;
575 	struct iovec md_iov = {
576 		.iov_base	= md_buf,
577 		.iov_len	= num_blocks * bdev->md_len,
578 	};
579 
580 	if (md_buf == NULL) {
581 		return 0;
582 	}
583 
584 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
585 	dif_opts.dif_pi_format = bdev->dif_pi_format;
586 	rc = spdk_dif_ctx_init(&dif_ctx,
587 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
588 			       bdev->dif_is_head_of_md, bdev->dif_type,
589 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
590 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
591 	if (rc != 0) {
592 		SPDK_ERRLOG("Initialization of DIF context failed\n");
593 		return rc;
594 	}
595 
596 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
597 	if (rc != 0) {
598 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
599 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
600 	}
601 
602 	return rc;
603 }
604 
605 void
606 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
607 {
608 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
609 	int rc;
610 
611 	spdk_trace_record(TRACE_BDEV_RAID_IO_DONE, 0, 0, (uintptr_t)raid_io, (uintptr_t)bdev_io);
612 
613 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
614 		struct iovec *split_iov = raid_io->split.iov;
615 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
616 
617 		/*
618 		 * Non-zero offset here means that this is the completion of the first part of the
619 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
620 		 */
621 		if (raid_io->split.offset != 0) {
622 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
623 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
624 
625 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
626 				raid_io->num_blocks = raid_io->split.offset;
627 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
628 				raid_io->iovs = bdev_io->u.bdev.iovs;
629 				if (split_iov != NULL) {
630 					raid_io->iovcnt++;
631 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
632 					split_iov->iov_base = split_iov_orig->iov_base;
633 				}
634 
635 				raid_io->split.offset = 0;
636 				raid_io->base_bdev_io_submitted = 0;
637 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
638 
639 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
640 				return;
641 			}
642 		}
643 
644 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
645 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
646 		raid_io->iovs = bdev_io->u.bdev.iovs;
647 		if (split_iov != NULL) {
648 			*split_iov = *split_iov_orig;
649 		}
650 	}
651 
652 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
653 		raid_io->completion_cb(raid_io, status);
654 	} else {
655 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
656 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
657 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
658 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
659 
660 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
661 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
662 							bdev_io->u.bdev.offset_blocks);
663 			if (rc != 0) {
664 				status = SPDK_BDEV_IO_STATUS_FAILED;
665 			}
666 		}
667 		spdk_bdev_io_complete(bdev_io, status);
668 	}
669 }
670 
671 /*
672  * brief:
673  * raid_bdev_io_complete_part - signal the completion of a part of the expected
674  * base bdev IOs and complete the raid_io if this is the final expected IO.
675  * The caller should first set raid_io->base_bdev_io_remaining. This function
676  * will decrement this counter by the value of the 'completed' parameter and
677  * complete the raid_io if the counter reaches 0. The caller is free to
678  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
679  * it can represent e.g. blocks or IOs.
680  * params:
681  * raid_io - pointer to raid_bdev_io
682  * completed - the part of the raid_io that has been completed
683  * status - status of the base IO
684  * returns:
685  * true - if the raid_io is completed
686  * false - otherwise
687  */
688 bool
689 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
690 			   enum spdk_bdev_io_status status)
691 {
692 	assert(raid_io->base_bdev_io_remaining >= completed);
693 	raid_io->base_bdev_io_remaining -= completed;
694 
695 	if (status != raid_io->base_bdev_io_status_default) {
696 		raid_io->base_bdev_io_status = status;
697 	}
698 
699 	if (raid_io->base_bdev_io_remaining == 0) {
700 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
701 		return true;
702 	} else {
703 		return false;
704 	}
705 }
706 
707 /*
708  * brief:
709  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
710  * It will try to queue the IOs after storing the context to bdev wait queue logic.
711  * params:
712  * raid_io - pointer to raid_bdev_io
713  * bdev - the block device that the IO is submitted to
714  * ch - io channel
715  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
716  * returns:
717  * none
718  */
719 void
720 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
721 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
722 {
723 	raid_io->waitq_entry.bdev = bdev;
724 	raid_io->waitq_entry.cb_fn = cb_fn;
725 	raid_io->waitq_entry.cb_arg = raid_io;
726 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
727 }
728 
729 static void
730 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
731 {
732 	struct raid_bdev_io *raid_io = cb_arg;
733 
734 	spdk_bdev_free_io(bdev_io);
735 
736 	raid_bdev_io_complete_part(raid_io, 1, success ?
737 				   SPDK_BDEV_IO_STATUS_SUCCESS :
738 				   SPDK_BDEV_IO_STATUS_FAILED);
739 }
740 
741 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
742 
743 static void
744 _raid_bdev_submit_reset_request(void *_raid_io)
745 {
746 	struct raid_bdev_io *raid_io = _raid_io;
747 
748 	raid_bdev_submit_reset_request(raid_io);
749 }
750 
751 /*
752  * brief:
753  * raid_bdev_submit_reset_request function submits reset requests
754  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
755  * which case it will queue it for later submission
756  * params:
757  * raid_io
758  * returns:
759  * none
760  */
761 static void
762 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
763 {
764 	struct raid_bdev		*raid_bdev;
765 	int				ret;
766 	uint8_t				i;
767 	struct raid_base_bdev_info	*base_info;
768 	struct spdk_io_channel		*base_ch;
769 
770 	raid_bdev = raid_io->raid_bdev;
771 
772 	if (raid_io->base_bdev_io_remaining == 0) {
773 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
774 	}
775 
776 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
777 		base_info = &raid_bdev->base_bdev_info[i];
778 		base_ch = raid_io->raid_ch->base_channel[i];
779 		if (base_ch == NULL) {
780 			raid_io->base_bdev_io_submitted++;
781 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
782 			continue;
783 		}
784 		ret = spdk_bdev_reset(base_info->desc, base_ch,
785 				      raid_base_bdev_reset_complete, raid_io);
786 		if (ret == 0) {
787 			raid_io->base_bdev_io_submitted++;
788 		} else if (ret == -ENOMEM) {
789 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
790 						base_ch, _raid_bdev_submit_reset_request);
791 			return;
792 		} else {
793 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
794 			assert(false);
795 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
796 			return;
797 		}
798 	}
799 }
800 
801 static void
802 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
803 {
804 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
805 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
806 	int i;
807 
808 	assert(split_offset != 0);
809 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
810 	raid_io->split.offset = split_offset;
811 
812 	raid_io->offset_blocks += split_offset;
813 	raid_io->num_blocks -= split_offset;
814 	if (raid_io->md_buf != NULL) {
815 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
816 	}
817 
818 	for (i = 0; i < raid_io->iovcnt; i++) {
819 		struct iovec *iov = &raid_io->iovs[i];
820 
821 		if (iov_offset < iov->iov_len) {
822 			if (iov_offset == 0) {
823 				raid_io->split.iov = NULL;
824 			} else {
825 				raid_io->split.iov = iov;
826 				raid_io->split.iov_copy = *iov;
827 				iov->iov_base += iov_offset;
828 				iov->iov_len -= iov_offset;
829 			}
830 			raid_io->iovs += i;
831 			raid_io->iovcnt -= i;
832 			break;
833 		}
834 
835 		iov_offset -= iov->iov_len;
836 	}
837 }
838 
839 static void
840 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
841 {
842 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
843 
844 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
845 		uint64_t offset_begin = raid_io->offset_blocks;
846 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
847 
848 		if (offset_end > raid_ch->process.offset) {
849 			if (offset_begin < raid_ch->process.offset) {
850 				/*
851 				 * If the I/O spans both the processed and unprocessed ranges,
852 				 * split it and first handle the unprocessed part. After it
853 				 * completes, the rest will be handled.
854 				 * This situation occurs when the process thread is not active
855 				 * or is waiting for the process window range to be locked
856 				 * (quiesced). When a window is being processed, such I/Os will be
857 				 * deferred by the bdev layer until the window is unlocked.
858 				 */
859 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
860 					      raid_ch->process.offset, offset_begin, offset_end);
861 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
862 			}
863 		} else {
864 			/* Use the child channel, which corresponds to the already processed range */
865 			raid_io->raid_ch = raid_ch->process.ch_processed;
866 		}
867 	}
868 
869 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
870 }
871 
872 /*
873  * brief:
874  * Callback function to spdk_bdev_io_get_buf.
875  * params:
876  * ch - pointer to raid bdev io channel
877  * bdev_io - pointer to parent bdev_io on raid bdev device
878  * success - True if buffer is allocated or false otherwise.
879  * returns:
880  * none
881  */
882 static void
883 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
884 		     bool success)
885 {
886 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
887 
888 	if (!success) {
889 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
890 		return;
891 	}
892 
893 	raid_io->iovs = bdev_io->u.bdev.iovs;
894 	raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
895 	raid_io->md_buf = bdev_io->u.bdev.md_buf;
896 
897 	raid_bdev_submit_rw_request(raid_io);
898 }
899 
900 void
901 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
902 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
903 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
904 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
905 {
906 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
907 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
908 
909 	raid_io->type = type;
910 	raid_io->offset_blocks = offset_blocks;
911 	raid_io->num_blocks = num_blocks;
912 	raid_io->iovs = iovs;
913 	raid_io->iovcnt = iovcnt;
914 	raid_io->memory_domain = memory_domain;
915 	raid_io->memory_domain_ctx = memory_domain_ctx;
916 	raid_io->md_buf = md_buf;
917 
918 	raid_io->raid_bdev = raid_bdev;
919 	raid_io->raid_ch = raid_ch;
920 	raid_io->base_bdev_io_remaining = 0;
921 	raid_io->base_bdev_io_submitted = 0;
922 	raid_io->completion_cb = NULL;
923 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
924 
925 	raid_bdev_io_set_default_status(raid_io, SPDK_BDEV_IO_STATUS_SUCCESS);
926 }
927 
928 /*
929  * brief:
930  * raid_bdev_submit_request function is the submit_request function pointer of
931  * raid bdev function table. This is used to submit the io on raid_bdev to below
932  * layers.
933  * params:
934  * ch - pointer to raid bdev io channel
935  * bdev_io - pointer to parent bdev_io on raid bdev device
936  * returns:
937  * none
938  */
939 static void
940 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
941 {
942 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
943 
944 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
945 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
946 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
947 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
948 
949 	spdk_trace_record(TRACE_BDEV_RAID_IO_START, 0, 0, (uintptr_t)raid_io, (uintptr_t)bdev_io);
950 
951 	switch (bdev_io->type) {
952 	case SPDK_BDEV_IO_TYPE_READ:
953 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
954 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
955 		break;
956 	case SPDK_BDEV_IO_TYPE_WRITE:
957 		raid_bdev_submit_rw_request(raid_io);
958 		break;
959 
960 	case SPDK_BDEV_IO_TYPE_RESET:
961 		raid_bdev_submit_reset_request(raid_io);
962 		break;
963 
964 	case SPDK_BDEV_IO_TYPE_FLUSH:
965 	case SPDK_BDEV_IO_TYPE_UNMAP:
966 		if (raid_io->raid_bdev->process != NULL) {
967 			/* TODO: rebuild support */
968 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
969 			return;
970 		}
971 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
972 		break;
973 
974 	default:
975 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
976 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
977 		break;
978 	}
979 }
980 
981 /*
982  * brief:
983  * _raid_bdev_io_type_supported checks whether io_type is supported in
984  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
985  * doesn't support, the raid device doesn't supports.
986  *
987  * params:
988  * raid_bdev - pointer to raid bdev context
989  * io_type - io type
990  * returns:
991  * true - io_type is supported
992  * false - io_type is not supported
993  */
994 inline static bool
995 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
996 {
997 	struct raid_base_bdev_info *base_info;
998 
999 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
1000 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
1001 		if (raid_bdev->module->submit_null_payload_request == NULL) {
1002 			return false;
1003 		}
1004 	}
1005 
1006 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1007 		if (base_info->desc == NULL) {
1008 			continue;
1009 		}
1010 
1011 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
1012 			return false;
1013 		}
1014 	}
1015 
1016 	return true;
1017 }
1018 
1019 /*
1020  * brief:
1021  * raid_bdev_io_type_supported is the io_supported function for bdev function
1022  * table which returns whether the particular io type is supported or not by
1023  * raid bdev module
1024  * params:
1025  * ctx - pointer to raid bdev context
1026  * type - io type
1027  * returns:
1028  * true - io_type is supported
1029  * false - io_type is not supported
1030  */
1031 static bool
1032 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1033 {
1034 	switch (io_type) {
1035 	case SPDK_BDEV_IO_TYPE_READ:
1036 	case SPDK_BDEV_IO_TYPE_WRITE:
1037 		return true;
1038 
1039 	case SPDK_BDEV_IO_TYPE_FLUSH:
1040 	case SPDK_BDEV_IO_TYPE_RESET:
1041 	case SPDK_BDEV_IO_TYPE_UNMAP:
1042 		return _raid_bdev_io_type_supported(ctx, io_type);
1043 
1044 	default:
1045 		return false;
1046 	}
1047 
1048 	return false;
1049 }
1050 
1051 /*
1052  * brief:
1053  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1054  * raid bdev. This is used to return the io channel for this raid bdev
1055  * params:
1056  * ctxt - pointer to raid_bdev
1057  * returns:
1058  * pointer to io channel for raid bdev
1059  */
1060 static struct spdk_io_channel *
1061 raid_bdev_get_io_channel(void *ctxt)
1062 {
1063 	struct raid_bdev *raid_bdev = ctxt;
1064 
1065 	return spdk_get_io_channel(raid_bdev);
1066 }
1067 
1068 void
1069 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1070 {
1071 	struct raid_base_bdev_info *base_info;
1072 
1073 	assert(raid_bdev != NULL);
1074 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1075 
1076 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1077 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1078 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1079 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1080 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1081 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1082 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1083 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1084 				     raid_bdev->num_base_bdevs_operational);
1085 	if (raid_bdev->process) {
1086 		struct raid_bdev_process *process = raid_bdev->process;
1087 		uint64_t offset = process->window_offset;
1088 
1089 		spdk_json_write_named_object_begin(w, "process");
1090 		spdk_json_write_name(w, "type");
1091 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1092 		spdk_json_write_named_string(w, "target", process->target->name);
1093 		spdk_json_write_named_object_begin(w, "progress");
1094 		spdk_json_write_named_uint64(w, "blocks", offset);
1095 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1096 		spdk_json_write_object_end(w);
1097 		spdk_json_write_object_end(w);
1098 	}
1099 	spdk_json_write_name(w, "base_bdevs_list");
1100 	spdk_json_write_array_begin(w);
1101 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1102 		spdk_json_write_object_begin(w);
1103 		spdk_json_write_name(w, "name");
1104 		if (base_info->name) {
1105 			spdk_json_write_string(w, base_info->name);
1106 		} else {
1107 			spdk_json_write_null(w);
1108 		}
1109 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1110 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1111 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1112 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1113 		spdk_json_write_object_end(w);
1114 	}
1115 	spdk_json_write_array_end(w);
1116 }
1117 
1118 /*
1119  * brief:
1120  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1121  * params:
1122  * ctx - pointer to raid_bdev
1123  * w - pointer to json context
1124  * returns:
1125  * 0 - success
1126  * non zero - failure
1127  */
1128 static int
1129 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1130 {
1131 	struct raid_bdev *raid_bdev = ctx;
1132 
1133 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1134 
1135 	/* Dump the raid bdev configuration related information */
1136 	spdk_json_write_named_object_begin(w, "raid");
1137 	raid_bdev_write_info_json(raid_bdev, w);
1138 	spdk_json_write_object_end(w);
1139 
1140 	return 0;
1141 }
1142 
1143 /*
1144  * brief:
1145  * raid_bdev_write_config_json is the function table pointer for raid bdev
1146  * params:
1147  * bdev - pointer to spdk_bdev
1148  * w - pointer to json context
1149  * returns:
1150  * none
1151  */
1152 static void
1153 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1154 {
1155 	struct raid_bdev *raid_bdev = bdev->ctxt;
1156 	struct raid_base_bdev_info *base_info;
1157 
1158 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1159 
1160 	if (raid_bdev->superblock_enabled) {
1161 		/* raid bdev configuration is stored in the superblock */
1162 		return;
1163 	}
1164 
1165 	spdk_json_write_object_begin(w);
1166 
1167 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1168 
1169 	spdk_json_write_named_object_begin(w, "params");
1170 	spdk_json_write_named_string(w, "name", bdev->name);
1171 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1172 	if (raid_bdev->strip_size_kb != 0) {
1173 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1174 	}
1175 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1176 
1177 	spdk_json_write_named_array_begin(w, "base_bdevs");
1178 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1179 		if (base_info->name) {
1180 			spdk_json_write_string(w, base_info->name);
1181 		} else {
1182 			char str[32];
1183 
1184 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1185 			spdk_json_write_string(w, str);
1186 		}
1187 	}
1188 	spdk_json_write_array_end(w);
1189 	spdk_json_write_object_end(w);
1190 
1191 	spdk_json_write_object_end(w);
1192 }
1193 
1194 static int
1195 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1196 {
1197 	struct raid_bdev *raid_bdev = ctx;
1198 	struct raid_base_bdev_info *base_info;
1199 	int domains_count = 0, rc = 0;
1200 
1201 	if (raid_bdev->module->memory_domains_supported == false) {
1202 		return 0;
1203 	}
1204 
1205 	/* First loop to get the number of memory domains */
1206 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1207 		if (base_info->is_configured == false) {
1208 			continue;
1209 		}
1210 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1211 		if (rc < 0) {
1212 			return rc;
1213 		}
1214 		domains_count += rc;
1215 	}
1216 
1217 	if (!domains || array_size < domains_count) {
1218 		return domains_count;
1219 	}
1220 
1221 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1222 		if (base_info->is_configured == false) {
1223 			continue;
1224 		}
1225 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1226 		if (rc < 0) {
1227 			return rc;
1228 		}
1229 		domains += rc;
1230 		array_size -= rc;
1231 	}
1232 
1233 	return domains_count;
1234 }
1235 
1236 /* g_raid_bdev_fn_table is the function table for raid bdev */
1237 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1238 	.destruct		= raid_bdev_destruct,
1239 	.submit_request		= raid_bdev_submit_request,
1240 	.io_type_supported	= raid_bdev_io_type_supported,
1241 	.get_io_channel		= raid_bdev_get_io_channel,
1242 	.dump_info_json		= raid_bdev_dump_info_json,
1243 	.write_config_json	= raid_bdev_write_config_json,
1244 	.get_memory_domains	= raid_bdev_get_memory_domains,
1245 };
1246 
1247 struct raid_bdev *
1248 raid_bdev_find_by_name(const char *name)
1249 {
1250 	struct raid_bdev *raid_bdev;
1251 
1252 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1253 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1254 			return raid_bdev;
1255 		}
1256 	}
1257 
1258 	return NULL;
1259 }
1260 
1261 static struct raid_bdev *
1262 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1263 {
1264 	struct raid_bdev *raid_bdev;
1265 
1266 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1267 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1268 			return raid_bdev;
1269 		}
1270 	}
1271 
1272 	return NULL;
1273 }
1274 
1275 static struct {
1276 	const char *name;
1277 	enum raid_level value;
1278 } g_raid_level_names[] = {
1279 	{ "raid0", RAID0 },
1280 	{ "0", RAID0 },
1281 	{ "raid1", RAID1 },
1282 	{ "1", RAID1 },
1283 	{ "raid5f", RAID5F },
1284 	{ "5f", RAID5F },
1285 	{ "concat", CONCAT },
1286 	{ }
1287 };
1288 
1289 const char *g_raid_state_names[] = {
1290 	[RAID_BDEV_STATE_ONLINE]	= "online",
1291 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1292 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1293 	[RAID_BDEV_STATE_MAX]		= NULL
1294 };
1295 
1296 static const char *g_raid_process_type_names[] = {
1297 	[RAID_PROCESS_NONE]	= "none",
1298 	[RAID_PROCESS_REBUILD]	= "rebuild",
1299 	[RAID_PROCESS_MAX]	= NULL
1300 };
1301 
1302 /* We have to use the typedef in the function declaration to appease astyle. */
1303 typedef enum raid_level raid_level_t;
1304 typedef enum raid_bdev_state raid_bdev_state_t;
1305 
1306 raid_level_t
1307 raid_bdev_str_to_level(const char *str)
1308 {
1309 	unsigned int i;
1310 
1311 	assert(str != NULL);
1312 
1313 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1314 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1315 			return g_raid_level_names[i].value;
1316 		}
1317 	}
1318 
1319 	return INVALID_RAID_LEVEL;
1320 }
1321 
1322 const char *
1323 raid_bdev_level_to_str(enum raid_level level)
1324 {
1325 	unsigned int i;
1326 
1327 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1328 		if (g_raid_level_names[i].value == level) {
1329 			return g_raid_level_names[i].name;
1330 		}
1331 	}
1332 
1333 	return "";
1334 }
1335 
1336 raid_bdev_state_t
1337 raid_bdev_str_to_state(const char *str)
1338 {
1339 	unsigned int i;
1340 
1341 	assert(str != NULL);
1342 
1343 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1344 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1345 			break;
1346 		}
1347 	}
1348 
1349 	return i;
1350 }
1351 
1352 const char *
1353 raid_bdev_state_to_str(enum raid_bdev_state state)
1354 {
1355 	if (state >= RAID_BDEV_STATE_MAX) {
1356 		return "";
1357 	}
1358 
1359 	return g_raid_state_names[state];
1360 }
1361 
1362 const char *
1363 raid_bdev_process_to_str(enum raid_process_type value)
1364 {
1365 	if (value >= RAID_PROCESS_MAX) {
1366 		return "";
1367 	}
1368 
1369 	return g_raid_process_type_names[value];
1370 }
1371 
1372 /*
1373  * brief:
1374  * raid_bdev_fini_start is called when bdev layer is starting the
1375  * shutdown process
1376  * params:
1377  * none
1378  * returns:
1379  * none
1380  */
1381 static void
1382 raid_bdev_fini_start(void)
1383 {
1384 	struct raid_bdev *raid_bdev;
1385 	struct raid_base_bdev_info *base_info;
1386 
1387 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1388 
1389 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1390 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1391 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1392 				raid_bdev_free_base_bdev_resource(base_info);
1393 			}
1394 		}
1395 	}
1396 
1397 	g_shutdown_started = true;
1398 }
1399 
1400 /*
1401  * brief:
1402  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1403  * params:
1404  * none
1405  * returns:
1406  * none
1407  */
1408 static void
1409 raid_bdev_exit(void)
1410 {
1411 	struct raid_bdev *raid_bdev, *tmp;
1412 
1413 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1414 
1415 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1416 		raid_bdev_cleanup_and_free(raid_bdev);
1417 	}
1418 }
1419 
1420 static void
1421 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1422 {
1423 	spdk_json_write_object_begin(w);
1424 
1425 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1426 
1427 	spdk_json_write_named_object_begin(w, "params");
1428 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1429 	spdk_json_write_named_uint32(w, "process_max_bandwidth_mb_sec",
1430 				     g_opts.process_max_bandwidth_mb_sec);
1431 	spdk_json_write_object_end(w);
1432 
1433 	spdk_json_write_object_end(w);
1434 }
1435 
1436 static int
1437 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1438 {
1439 	raid_bdev_opts_config_json(w);
1440 
1441 	return 0;
1442 }
1443 
1444 /*
1445  * brief:
1446  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1447  * module
1448  * params:
1449  * none
1450  * returns:
1451  * size of spdk_bdev_io context for raid
1452  */
1453 static int
1454 raid_bdev_get_ctx_size(void)
1455 {
1456 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1457 	return sizeof(struct raid_bdev_io);
1458 }
1459 
1460 static struct spdk_bdev_module g_raid_if = {
1461 	.name = "raid",
1462 	.module_init = raid_bdev_init,
1463 	.fini_start = raid_bdev_fini_start,
1464 	.module_fini = raid_bdev_exit,
1465 	.config_json = raid_bdev_config_json,
1466 	.get_ctx_size = raid_bdev_get_ctx_size,
1467 	.examine_disk = raid_bdev_examine,
1468 	.async_init = false,
1469 	.async_fini = false,
1470 };
1471 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1472 
1473 /*
1474  * brief:
1475  * raid_bdev_init is the initialization function for raid bdev module
1476  * params:
1477  * none
1478  * returns:
1479  * 0 - success
1480  * non zero - failure
1481  */
1482 static int
1483 raid_bdev_init(void)
1484 {
1485 	return 0;
1486 }
1487 
1488 static int
1489 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1490 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1491 		  struct raid_bdev **raid_bdev_out)
1492 {
1493 	struct raid_bdev *raid_bdev;
1494 	struct spdk_bdev *raid_bdev_gen;
1495 	struct raid_bdev_module *module;
1496 	struct raid_base_bdev_info *base_info;
1497 	uint8_t min_operational;
1498 
1499 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1500 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1501 		return -EINVAL;
1502 	}
1503 
1504 	if (raid_bdev_find_by_name(name) != NULL) {
1505 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1506 		return -EEXIST;
1507 	}
1508 
1509 	if (level == RAID1) {
1510 		if (strip_size != 0) {
1511 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1512 			return -EINVAL;
1513 		}
1514 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1515 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1516 		return -EINVAL;
1517 	}
1518 
1519 	module = raid_bdev_module_find(level);
1520 	if (module == NULL) {
1521 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1522 		return -EINVAL;
1523 	}
1524 
1525 	assert(module->base_bdevs_min != 0);
1526 	if (num_base_bdevs < module->base_bdevs_min) {
1527 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1528 			    module->base_bdevs_min,
1529 			    raid_bdev_level_to_str(level));
1530 		return -EINVAL;
1531 	}
1532 
1533 	switch (module->base_bdevs_constraint.type) {
1534 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1535 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1536 		break;
1537 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1538 		min_operational = module->base_bdevs_constraint.value;
1539 		break;
1540 	case CONSTRAINT_UNSET:
1541 		if (module->base_bdevs_constraint.value != 0) {
1542 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1543 				    (uint8_t)module->base_bdevs_constraint.value, name);
1544 			return -EINVAL;
1545 		}
1546 		min_operational = num_base_bdevs;
1547 		break;
1548 	default:
1549 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1550 			    (uint8_t)module->base_bdevs_constraint.type,
1551 			    raid_bdev_level_to_str(module->level));
1552 		return -EINVAL;
1553 	};
1554 
1555 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1556 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1557 			    raid_bdev_level_to_str(module->level));
1558 		return -EINVAL;
1559 	}
1560 
1561 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1562 	if (!raid_bdev) {
1563 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1564 		return -ENOMEM;
1565 	}
1566 
1567 	raid_bdev->module = module;
1568 	raid_bdev->num_base_bdevs = num_base_bdevs;
1569 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1570 					   sizeof(struct raid_base_bdev_info));
1571 	if (!raid_bdev->base_bdev_info) {
1572 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1573 		raid_bdev_free(raid_bdev);
1574 		return -ENOMEM;
1575 	}
1576 
1577 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1578 		base_info->raid_bdev = raid_bdev;
1579 	}
1580 
1581 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1582 	 * internally and set later.
1583 	 */
1584 	raid_bdev->strip_size = 0;
1585 	raid_bdev->strip_size_kb = strip_size;
1586 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1587 	raid_bdev->level = level;
1588 	raid_bdev->min_base_bdevs_operational = min_operational;
1589 	raid_bdev->superblock_enabled = superblock_enabled;
1590 
1591 	raid_bdev_gen = &raid_bdev->bdev;
1592 
1593 	raid_bdev_gen->name = strdup(name);
1594 	if (!raid_bdev_gen->name) {
1595 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1596 		raid_bdev_free(raid_bdev);
1597 		return -ENOMEM;
1598 	}
1599 
1600 	raid_bdev_gen->product_name = "Raid Volume";
1601 	raid_bdev_gen->ctxt = raid_bdev;
1602 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1603 	raid_bdev_gen->module = &g_raid_if;
1604 	raid_bdev_gen->write_cache = 0;
1605 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1606 
1607 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1608 
1609 	*raid_bdev_out = raid_bdev;
1610 
1611 	return 0;
1612 }
1613 
1614 /*
1615  * brief:
1616  * raid_bdev_create allocates raid bdev based on passed configuration
1617  * params:
1618  * name - name for raid bdev
1619  * strip_size - strip size in KB
1620  * num_base_bdevs - number of base bdevs
1621  * level - raid level
1622  * superblock_enabled - true if raid should have superblock
1623  * uuid - uuid to set for the bdev
1624  * raid_bdev_out - the created raid bdev
1625  * returns:
1626  * 0 - success
1627  * non zero - failure
1628  */
1629 int
1630 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1631 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1632 		 struct raid_bdev **raid_bdev_out)
1633 {
1634 	struct raid_bdev *raid_bdev;
1635 	int rc;
1636 
1637 	assert(uuid != NULL);
1638 
1639 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1640 			       &raid_bdev);
1641 	if (rc != 0) {
1642 		return rc;
1643 	}
1644 
1645 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1646 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1647 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1648 	}
1649 
1650 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1651 
1652 	*raid_bdev_out = raid_bdev;
1653 
1654 	return 0;
1655 }
1656 
1657 static void
1658 _raid_bdev_unregistering_cont(void *ctx)
1659 {
1660 	struct raid_bdev *raid_bdev = ctx;
1661 
1662 	spdk_bdev_close(raid_bdev->self_desc);
1663 	raid_bdev->self_desc = NULL;
1664 }
1665 
1666 static void
1667 raid_bdev_unregistering_cont(void *ctx)
1668 {
1669 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1670 }
1671 
1672 static int
1673 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1674 {
1675 	struct raid_process_finish_action *finish_action;
1676 
1677 	assert(spdk_get_thread() == process->thread);
1678 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1679 
1680 	finish_action = calloc(1, sizeof(*finish_action));
1681 	if (finish_action == NULL) {
1682 		return -ENOMEM;
1683 	}
1684 
1685 	finish_action->cb = cb;
1686 	finish_action->cb_ctx = cb_ctx;
1687 
1688 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1689 
1690 	return 0;
1691 }
1692 
1693 static void
1694 raid_bdev_unregistering_stop_process(void *ctx)
1695 {
1696 	struct raid_bdev_process *process = ctx;
1697 	struct raid_bdev *raid_bdev = process->raid_bdev;
1698 	int rc;
1699 
1700 	process->state = RAID_PROCESS_STATE_STOPPING;
1701 	if (process->status == 0) {
1702 		process->status = -ECANCELED;
1703 	}
1704 
1705 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1706 	if (rc != 0) {
1707 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1708 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1709 	}
1710 }
1711 
1712 static void
1713 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1714 {
1715 	struct raid_bdev *raid_bdev = event_ctx;
1716 
1717 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1718 		if (raid_bdev->process != NULL) {
1719 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1720 					     raid_bdev->process);
1721 		} else {
1722 			raid_bdev_unregistering_cont(raid_bdev);
1723 		}
1724 	}
1725 }
1726 
1727 static void
1728 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1729 {
1730 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1731 	int rc;
1732 
1733 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1734 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1735 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1736 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1737 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1738 				sizeof(struct raid_bdev_io_channel),
1739 				raid_bdev_gen->name);
1740 	rc = spdk_bdev_register(raid_bdev_gen);
1741 	if (rc != 0) {
1742 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1743 			    raid_bdev_gen->name, spdk_strerror(-rc));
1744 		goto out;
1745 	}
1746 
1747 	/*
1748 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1749 	 * first. The process may still need to unquiesce a range but it will fail because the
1750 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1751 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1752 	 * so this is the only way currently to do this correctly.
1753 	 * TODO: try to handle this correctly in bdev layer instead.
1754 	 */
1755 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1756 				&raid_bdev->self_desc);
1757 	if (rc != 0) {
1758 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1759 			    raid_bdev_gen->name, spdk_strerror(-rc));
1760 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1761 		goto out;
1762 	}
1763 
1764 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1765 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1766 		      raid_bdev_gen->name, raid_bdev);
1767 out:
1768 	if (rc != 0) {
1769 		if (raid_bdev->module->stop != NULL) {
1770 			raid_bdev->module->stop(raid_bdev);
1771 		}
1772 		spdk_io_device_unregister(raid_bdev, NULL);
1773 		raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1774 	}
1775 
1776 	if (raid_bdev->configure_cb != NULL) {
1777 		raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, rc);
1778 		raid_bdev->configure_cb = NULL;
1779 	}
1780 }
1781 
1782 static void
1783 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1784 {
1785 	if (status == 0) {
1786 		raid_bdev_configure_cont(raid_bdev);
1787 	} else {
1788 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1789 			    raid_bdev->bdev.name, spdk_strerror(-status));
1790 		if (raid_bdev->module->stop != NULL) {
1791 			raid_bdev->module->stop(raid_bdev);
1792 		}
1793 		if (raid_bdev->configure_cb != NULL) {
1794 			raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, status);
1795 			raid_bdev->configure_cb = NULL;
1796 		}
1797 	}
1798 }
1799 
1800 /*
1801  * brief:
1802  * If raid bdev config is complete, then only register the raid bdev to
1803  * bdev layer and remove this raid bdev from configuring list and
1804  * insert the raid bdev to configured list
1805  * params:
1806  * raid_bdev - pointer to raid bdev
1807  * returns:
1808  * 0 - success
1809  * non zero - failure
1810  */
1811 static int
1812 raid_bdev_configure(struct raid_bdev *raid_bdev, raid_bdev_configure_cb cb, void *cb_ctx)
1813 {
1814 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1815 	int rc;
1816 
1817 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1818 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1819 	assert(raid_bdev->bdev.blocklen > 0);
1820 
1821 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1822 	 * internal use.
1823 	 */
1824 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1825 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1826 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1827 		return -EINVAL;
1828 	}
1829 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1830 
1831 	rc = raid_bdev->module->start(raid_bdev);
1832 	if (rc != 0) {
1833 		SPDK_ERRLOG("raid module startup callback failed\n");
1834 		return rc;
1835 	}
1836 
1837 	assert(raid_bdev->configure_cb == NULL);
1838 	raid_bdev->configure_cb = cb;
1839 	raid_bdev->configure_cb_ctx = cb_ctx;
1840 
1841 	if (raid_bdev->superblock_enabled) {
1842 		if (raid_bdev->sb == NULL) {
1843 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1844 			if (rc == 0) {
1845 				raid_bdev_init_superblock(raid_bdev);
1846 			}
1847 		} else {
1848 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1849 			if (raid_bdev->sb->block_size != data_block_size) {
1850 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1851 				rc = -EINVAL;
1852 			}
1853 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1854 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1855 				rc = -EINVAL;
1856 			}
1857 		}
1858 
1859 		if (rc != 0) {
1860 			raid_bdev->configure_cb = NULL;
1861 			if (raid_bdev->module->stop != NULL) {
1862 				raid_bdev->module->stop(raid_bdev);
1863 			}
1864 			return rc;
1865 		}
1866 
1867 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1868 	} else {
1869 		raid_bdev_configure_cont(raid_bdev);
1870 	}
1871 
1872 	return 0;
1873 }
1874 
1875 /*
1876  * brief:
1877  * If raid bdev is online and registered, change the bdev state to
1878  * configuring and unregister this raid device. Queue this raid device
1879  * in configuring list
1880  * params:
1881  * raid_bdev - pointer to raid bdev
1882  * cb_fn - callback function
1883  * cb_arg - argument to callback function
1884  * returns:
1885  * none
1886  */
1887 static void
1888 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1889 		      void *cb_arg)
1890 {
1891 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1892 		if (cb_fn) {
1893 			cb_fn(cb_arg, 0);
1894 		}
1895 		return;
1896 	}
1897 
1898 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1899 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1900 
1901 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1902 }
1903 
1904 /*
1905  * brief:
1906  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1907  * params:
1908  * base_bdev - pointer to base bdev
1909  * returns:
1910  * base bdev info if found, otherwise NULL.
1911  */
1912 static struct raid_base_bdev_info *
1913 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1914 {
1915 	struct raid_bdev *raid_bdev;
1916 	struct raid_base_bdev_info *base_info;
1917 
1918 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1919 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1920 			if (base_info->desc != NULL &&
1921 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1922 				return base_info;
1923 			}
1924 		}
1925 	}
1926 
1927 	return NULL;
1928 }
1929 
1930 static void
1931 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1932 {
1933 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1934 
1935 	assert(base_info->remove_scheduled);
1936 	base_info->remove_scheduled = false;
1937 
1938 	if (status == 0) {
1939 		raid_bdev->num_base_bdevs_operational--;
1940 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1941 			/* There is not enough base bdevs to keep the raid bdev operational. */
1942 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1943 			return;
1944 		}
1945 	}
1946 
1947 	if (base_info->remove_cb != NULL) {
1948 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1949 	}
1950 }
1951 
1952 static void
1953 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1954 {
1955 	struct raid_base_bdev_info *base_info = ctx;
1956 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1957 
1958 	if (status != 0) {
1959 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1960 			    raid_bdev->bdev.name, spdk_strerror(-status));
1961 	}
1962 
1963 	raid_bdev_remove_base_bdev_done(base_info, status);
1964 }
1965 
1966 static void
1967 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1968 {
1969 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1970 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1971 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1972 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1973 
1974 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1975 
1976 	if (raid_ch->base_channel[idx] != NULL) {
1977 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1978 		raid_ch->base_channel[idx] = NULL;
1979 	}
1980 
1981 	if (raid_ch->process.ch_processed != NULL) {
1982 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1983 	}
1984 
1985 	spdk_for_each_channel_continue(i, 0);
1986 }
1987 
1988 static void
1989 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1990 {
1991 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1992 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1993 
1994 	raid_bdev_free_base_bdev_resource(base_info);
1995 
1996 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1997 			    base_info);
1998 }
1999 
2000 static void
2001 raid_bdev_remove_base_bdev_cont(struct raid_base_bdev_info *base_info)
2002 {
2003 	raid_bdev_deconfigure_base_bdev(base_info);
2004 
2005 	spdk_for_each_channel(base_info->raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
2006 			      raid_bdev_channels_remove_base_bdev_done);
2007 }
2008 
2009 static void
2010 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2011 {
2012 	struct raid_base_bdev_info *base_info = ctx;
2013 
2014 	if (status != 0) {
2015 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
2016 			    raid_bdev->bdev.name, spdk_strerror(-status));
2017 		raid_bdev_remove_base_bdev_done(base_info, status);
2018 		return;
2019 	}
2020 
2021 	raid_bdev_remove_base_bdev_cont(base_info);
2022 }
2023 
2024 static void
2025 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
2026 {
2027 	struct raid_base_bdev_info *base_info = ctx;
2028 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2029 
2030 	if (status != 0) {
2031 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2032 			    raid_bdev->bdev.name, spdk_strerror(-status));
2033 		raid_bdev_remove_base_bdev_done(base_info, status);
2034 		return;
2035 	}
2036 
2037 	if (raid_bdev->sb) {
2038 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2039 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
2040 		uint8_t i;
2041 
2042 		for (i = 0; i < sb->base_bdevs_size; i++) {
2043 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2044 
2045 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
2046 			    sb_base_bdev->slot == slot) {
2047 				if (base_info->is_failed) {
2048 					sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
2049 				} else {
2050 					sb_base_bdev->state = RAID_SB_BASE_BDEV_MISSING;
2051 				}
2052 
2053 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
2054 				return;
2055 			}
2056 		}
2057 	}
2058 
2059 	raid_bdev_remove_base_bdev_cont(base_info);
2060 }
2061 
2062 static int
2063 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2064 {
2065 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2066 
2067 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2068 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2069 }
2070 
2071 struct raid_bdev_process_base_bdev_remove_ctx {
2072 	struct raid_bdev_process *process;
2073 	struct raid_base_bdev_info *base_info;
2074 	uint8_t num_base_bdevs_operational;
2075 };
2076 
2077 static void
2078 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2079 {
2080 	struct raid_base_bdev_info *base_info = ctx;
2081 	int ret;
2082 
2083 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2084 	if (ret != 0) {
2085 		raid_bdev_remove_base_bdev_done(base_info, ret);
2086 	}
2087 }
2088 
2089 static void
2090 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2091 {
2092 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2093 	struct raid_base_bdev_info *base_info = ctx->base_info;
2094 
2095 	free(ctx);
2096 
2097 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2098 			     base_info);
2099 }
2100 
2101 static void
2102 _raid_bdev_process_base_bdev_remove(void *_ctx)
2103 {
2104 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2105 	struct raid_bdev_process *process = ctx->process;
2106 	int ret;
2107 
2108 	if (ctx->base_info != process->target &&
2109 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2110 		/* process doesn't need to be stopped */
2111 		raid_bdev_process_base_bdev_remove_cont(ctx);
2112 		return;
2113 	}
2114 
2115 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2116 	       process->state < RAID_PROCESS_STATE_STOPPED);
2117 
2118 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2119 	if (ret != 0) {
2120 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2121 		free(ctx);
2122 		return;
2123 	}
2124 
2125 	process->state = RAID_PROCESS_STATE_STOPPING;
2126 
2127 	if (process->status == 0) {
2128 		process->status = -ENODEV;
2129 	}
2130 }
2131 
2132 static int
2133 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2134 				   struct raid_base_bdev_info *base_info)
2135 {
2136 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2137 
2138 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2139 
2140 	ctx = calloc(1, sizeof(*ctx));
2141 	if (ctx == NULL) {
2142 		return -ENOMEM;
2143 	}
2144 
2145 	/*
2146 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2147 	 * because the process thread should not access raid_bdev's properties. Particularly,
2148 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2149 	 * will still be valid until the process is fully stopped.
2150 	 */
2151 	ctx->base_info = base_info;
2152 	ctx->process = process;
2153 	/*
2154 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2155 	 * after the removal and more than one base bdev may be removed at the same time
2156 	 */
2157 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2158 		if (base_info->is_configured && !base_info->remove_scheduled) {
2159 			ctx->num_base_bdevs_operational++;
2160 		}
2161 	}
2162 
2163 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2164 
2165 	return 0;
2166 }
2167 
2168 static int
2169 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2170 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2171 {
2172 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2173 	int ret = 0;
2174 
2175 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2176 
2177 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2178 
2179 	if (base_info->remove_scheduled || !base_info->is_configured) {
2180 		return -ENODEV;
2181 	}
2182 
2183 	assert(base_info->desc);
2184 	base_info->remove_scheduled = true;
2185 
2186 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2187 		/*
2188 		 * As raid bdev is not registered yet or already unregistered,
2189 		 * so cleanup should be done here itself.
2190 		 *
2191 		 * Removing a base bdev at this stage does not change the number of operational
2192 		 * base bdevs, only the number of discovered base bdevs.
2193 		 */
2194 		raid_bdev_free_base_bdev_resource(base_info);
2195 		base_info->remove_scheduled = false;
2196 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2197 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2198 			/* There is no base bdev for this raid, so free the raid device. */
2199 			raid_bdev_cleanup_and_free(raid_bdev);
2200 		}
2201 		if (cb_fn != NULL) {
2202 			cb_fn(cb_ctx, 0);
2203 		}
2204 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2205 		/* This raid bdev does not tolerate removing a base bdev. */
2206 		raid_bdev->num_base_bdevs_operational--;
2207 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2208 	} else {
2209 		base_info->remove_cb = cb_fn;
2210 		base_info->remove_cb_ctx = cb_ctx;
2211 
2212 		if (raid_bdev->process != NULL) {
2213 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2214 		} else {
2215 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2216 		}
2217 
2218 		if (ret != 0) {
2219 			base_info->remove_scheduled = false;
2220 		}
2221 	}
2222 
2223 	return ret;
2224 }
2225 
2226 /*
2227  * brief:
2228  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2229  * is removed. This function checks if this base bdev is part of any raid bdev
2230  * or not. If yes, it takes necessary action on that particular raid bdev.
2231  * params:
2232  * base_bdev - pointer to base bdev which got removed
2233  * cb_fn - callback function
2234  * cb_arg - argument to callback function
2235  * returns:
2236  * 0 - success
2237  * non zero - failure
2238  */
2239 int
2240 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2241 {
2242 	struct raid_base_bdev_info *base_info;
2243 
2244 	/* Find the raid_bdev which has claimed this base_bdev */
2245 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2246 	if (!base_info) {
2247 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2248 		return -ENODEV;
2249 	}
2250 
2251 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2252 }
2253 
2254 static void
2255 raid_bdev_fail_base_remove_cb(void *ctx, int status)
2256 {
2257 	struct raid_base_bdev_info *base_info = ctx;
2258 
2259 	if (status != 0) {
2260 		SPDK_WARNLOG("Failed to remove base bdev %s\n", base_info->name);
2261 		base_info->is_failed = false;
2262 	}
2263 }
2264 
2265 static void
2266 _raid_bdev_fail_base_bdev(void *ctx)
2267 {
2268 	struct raid_base_bdev_info *base_info = ctx;
2269 	int rc;
2270 
2271 	if (base_info->is_failed) {
2272 		return;
2273 	}
2274 	base_info->is_failed = true;
2275 
2276 	SPDK_NOTICELOG("Failing base bdev in slot %d ('%s') of raid bdev '%s'\n",
2277 		       raid_bdev_base_bdev_slot(base_info), base_info->name, base_info->raid_bdev->bdev.name);
2278 
2279 	rc = _raid_bdev_remove_base_bdev(base_info, raid_bdev_fail_base_remove_cb, base_info);
2280 	if (rc != 0) {
2281 		raid_bdev_fail_base_remove_cb(base_info, rc);
2282 	}
2283 }
2284 
2285 void
2286 raid_bdev_fail_base_bdev(struct raid_base_bdev_info *base_info)
2287 {
2288 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_fail_base_bdev, base_info);
2289 }
2290 
2291 static void
2292 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2293 {
2294 	if (status != 0) {
2295 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2296 			    raid_bdev->bdev.name, spdk_strerror(-status));
2297 	}
2298 }
2299 
2300 /*
2301  * brief:
2302  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2303  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2304  * If yes, call module handler to resize the raid_bdev if implemented.
2305  * params:
2306  * base_bdev - pointer to base bdev which got resized.
2307  * returns:
2308  * none
2309  */
2310 static void
2311 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2312 {
2313 	struct raid_bdev *raid_bdev;
2314 	struct raid_base_bdev_info *base_info;
2315 	uint64_t blockcnt_old;
2316 
2317 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2318 
2319 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2320 
2321 	/* Find the raid_bdev which has claimed this base_bdev */
2322 	if (!base_info) {
2323 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2324 		return;
2325 	}
2326 	raid_bdev = base_info->raid_bdev;
2327 
2328 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2329 
2330 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2331 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2332 
2333 	base_info->blockcnt = base_bdev->blockcnt;
2334 
2335 	if (!raid_bdev->module->resize) {
2336 		return;
2337 	}
2338 
2339 	blockcnt_old = raid_bdev->bdev.blockcnt;
2340 	if (raid_bdev->module->resize(raid_bdev) == false) {
2341 		return;
2342 	}
2343 
2344 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2345 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2346 
2347 	if (raid_bdev->superblock_enabled) {
2348 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2349 		uint8_t i;
2350 
2351 		for (i = 0; i < sb->base_bdevs_size; i++) {
2352 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2353 
2354 			if (sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2355 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2356 				sb_base_bdev->data_size = base_info->data_size;
2357 			}
2358 		}
2359 		sb->raid_size = raid_bdev->bdev.blockcnt;
2360 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2361 	}
2362 }
2363 
2364 /*
2365  * brief:
2366  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2367  * triggers asynchronous event.
2368  * params:
2369  * type - event details.
2370  * bdev - bdev that triggered event.
2371  * event_ctx - context for event.
2372  * returns:
2373  * none
2374  */
2375 static void
2376 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2377 			  void *event_ctx)
2378 {
2379 	int rc;
2380 
2381 	switch (type) {
2382 	case SPDK_BDEV_EVENT_REMOVE:
2383 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2384 		if (rc != 0) {
2385 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2386 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2387 		}
2388 		break;
2389 	case SPDK_BDEV_EVENT_RESIZE:
2390 		raid_bdev_resize_base_bdev(bdev);
2391 		break;
2392 	default:
2393 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2394 		break;
2395 	}
2396 }
2397 
2398 /*
2399  * brief:
2400  * Deletes the specified raid bdev
2401  * params:
2402  * raid_bdev - pointer to raid bdev
2403  * cb_fn - callback function
2404  * cb_arg - argument to callback function
2405  */
2406 void
2407 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2408 {
2409 	struct raid_base_bdev_info *base_info;
2410 
2411 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2412 
2413 	if (raid_bdev->destroy_started) {
2414 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2415 			      raid_bdev->bdev.name);
2416 		if (cb_fn) {
2417 			cb_fn(cb_arg, -EALREADY);
2418 		}
2419 		return;
2420 	}
2421 
2422 	raid_bdev->destroy_started = true;
2423 
2424 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2425 		base_info->remove_scheduled = true;
2426 
2427 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2428 			/*
2429 			 * As raid bdev is not registered yet or already unregistered,
2430 			 * so cleanup should be done here itself.
2431 			 */
2432 			raid_bdev_free_base_bdev_resource(base_info);
2433 		}
2434 	}
2435 
2436 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2437 		/* There is no base bdev for this raid, so free the raid device. */
2438 		raid_bdev_cleanup_and_free(raid_bdev);
2439 		if (cb_fn) {
2440 			cb_fn(cb_arg, 0);
2441 		}
2442 	} else {
2443 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2444 	}
2445 }
2446 
2447 static void
2448 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2449 {
2450 	if (status != 0) {
2451 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2452 			    raid_bdev->bdev.name, spdk_strerror(-status));
2453 	}
2454 }
2455 
2456 static void
2457 raid_bdev_process_finish_write_sb(void *ctx)
2458 {
2459 	struct raid_bdev *raid_bdev = ctx;
2460 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2461 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2462 	struct raid_base_bdev_info *base_info;
2463 	uint8_t i;
2464 
2465 	for (i = 0; i < sb->base_bdevs_size; i++) {
2466 		sb_base_bdev = &sb->base_bdevs[i];
2467 
2468 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2469 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2470 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2471 			if (base_info->is_configured) {
2472 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2473 				sb_base_bdev->data_offset = base_info->data_offset;
2474 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2475 			}
2476 		}
2477 	}
2478 
2479 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2480 }
2481 
2482 static void raid_bdev_process_free(struct raid_bdev_process *process);
2483 
2484 static void
2485 _raid_bdev_process_finish_done(void *ctx)
2486 {
2487 	struct raid_bdev_process *process = ctx;
2488 	struct raid_process_finish_action *finish_action;
2489 
2490 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2491 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2492 		finish_action->cb(finish_action->cb_ctx);
2493 		free(finish_action);
2494 	}
2495 
2496 	spdk_poller_unregister(&process->qos.process_continue_poller);
2497 
2498 	raid_bdev_process_free(process);
2499 
2500 	spdk_thread_exit(spdk_get_thread());
2501 }
2502 
2503 static void
2504 raid_bdev_process_finish_target_removed(void *ctx, int status)
2505 {
2506 	struct raid_bdev_process *process = ctx;
2507 
2508 	if (status != 0) {
2509 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2510 	}
2511 
2512 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2513 }
2514 
2515 static void
2516 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2517 {
2518 	struct raid_bdev_process *process = ctx;
2519 
2520 	if (status != 0) {
2521 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2522 	}
2523 
2524 	if (process->status != 0) {
2525 		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2526 						     process);
2527 		if (status != 0) {
2528 			raid_bdev_process_finish_target_removed(process, status);
2529 		}
2530 		return;
2531 	}
2532 
2533 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2534 }
2535 
2536 static void
2537 raid_bdev_process_finish_unquiesce(void *ctx)
2538 {
2539 	struct raid_bdev_process *process = ctx;
2540 	int rc;
2541 
2542 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2543 				 raid_bdev_process_finish_unquiesced, process);
2544 	if (rc != 0) {
2545 		raid_bdev_process_finish_unquiesced(process, rc);
2546 	}
2547 }
2548 
2549 static void
2550 raid_bdev_process_finish_done(void *ctx)
2551 {
2552 	struct raid_bdev_process *process = ctx;
2553 	struct raid_bdev *raid_bdev = process->raid_bdev;
2554 
2555 	if (process->raid_ch != NULL) {
2556 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2557 	}
2558 
2559 	process->state = RAID_PROCESS_STATE_STOPPED;
2560 
2561 	if (process->status == 0) {
2562 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2563 			       raid_bdev_process_to_str(process->type),
2564 			       raid_bdev->bdev.name);
2565 		if (raid_bdev->superblock_enabled) {
2566 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2567 					     raid_bdev_process_finish_write_sb,
2568 					     raid_bdev);
2569 		}
2570 	} else {
2571 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2572 			     raid_bdev_process_to_str(process->type),
2573 			     raid_bdev->bdev.name,
2574 			     spdk_strerror(-process->status));
2575 	}
2576 
2577 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2578 			     process);
2579 }
2580 
2581 static void
2582 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2583 {
2584 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2585 
2586 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2587 }
2588 
2589 static void
2590 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2591 {
2592 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2593 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2594 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2595 
2596 	if (process->status == 0) {
2597 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2598 
2599 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2600 		raid_ch->process.target_ch = NULL;
2601 	}
2602 
2603 	raid_bdev_ch_process_cleanup(raid_ch);
2604 
2605 	spdk_for_each_channel_continue(i, 0);
2606 }
2607 
2608 static void
2609 raid_bdev_process_finish_quiesced(void *ctx, int status)
2610 {
2611 	struct raid_bdev_process *process = ctx;
2612 	struct raid_bdev *raid_bdev = process->raid_bdev;
2613 
2614 	if (status != 0) {
2615 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2616 		return;
2617 	}
2618 
2619 	raid_bdev->process = NULL;
2620 	process->target->is_process_target = false;
2621 
2622 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2623 			      __raid_bdev_process_finish);
2624 }
2625 
2626 static void
2627 _raid_bdev_process_finish(void *ctx)
2628 {
2629 	struct raid_bdev_process *process = ctx;
2630 	int rc;
2631 
2632 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2633 			       raid_bdev_process_finish_quiesced, process);
2634 	if (rc != 0) {
2635 		raid_bdev_process_finish_quiesced(ctx, rc);
2636 	}
2637 }
2638 
2639 static void
2640 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2641 {
2642 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2643 }
2644 
2645 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2646 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2647 
2648 static void
2649 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2650 {
2651 	assert(spdk_get_thread() == process->thread);
2652 
2653 	if (process->status == 0) {
2654 		process->status = status;
2655 	}
2656 
2657 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2658 		return;
2659 	}
2660 
2661 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2662 	process->state = RAID_PROCESS_STATE_STOPPING;
2663 
2664 	if (process->window_range_locked) {
2665 		raid_bdev_process_unlock_window_range(process);
2666 	} else {
2667 		raid_bdev_process_thread_run(process);
2668 	}
2669 }
2670 
2671 static void
2672 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2673 {
2674 	struct raid_bdev_process *process = ctx;
2675 
2676 	if (status != 0) {
2677 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2678 		raid_bdev_process_finish(process, status);
2679 		return;
2680 	}
2681 
2682 	process->window_range_locked = false;
2683 	process->window_offset += process->window_size;
2684 
2685 	raid_bdev_process_thread_run(process);
2686 }
2687 
2688 static void
2689 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2690 {
2691 	int rc;
2692 
2693 	assert(process->window_range_locked == true);
2694 
2695 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2696 				       process->window_offset, process->max_window_size,
2697 				       raid_bdev_process_window_range_unlocked, process);
2698 	if (rc != 0) {
2699 		raid_bdev_process_window_range_unlocked(process, rc);
2700 	}
2701 }
2702 
2703 static void
2704 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2705 {
2706 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2707 
2708 	raid_bdev_process_unlock_window_range(process);
2709 }
2710 
2711 static void
2712 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2713 {
2714 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2715 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2716 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2717 
2718 	raid_ch->process.offset = process->window_offset + process->window_size;
2719 
2720 	spdk_for_each_channel_continue(i, 0);
2721 }
2722 
2723 void
2724 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2725 {
2726 	struct raid_bdev_process *process = process_req->process;
2727 
2728 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2729 
2730 	assert(spdk_get_thread() == process->thread);
2731 	assert(process->window_remaining >= process_req->num_blocks);
2732 
2733 	if (status != 0) {
2734 		process->window_status = status;
2735 	}
2736 
2737 	process->window_remaining -= process_req->num_blocks;
2738 	if (process->window_remaining == 0) {
2739 		if (process->window_status != 0) {
2740 			raid_bdev_process_finish(process, process->window_status);
2741 			return;
2742 		}
2743 
2744 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2745 				      raid_bdev_process_channels_update_done);
2746 	}
2747 }
2748 
2749 static int
2750 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2751 				 uint32_t num_blocks)
2752 {
2753 	struct raid_bdev *raid_bdev = process->raid_bdev;
2754 	struct raid_bdev_process_request *process_req;
2755 	int ret;
2756 
2757 	process_req = TAILQ_FIRST(&process->requests);
2758 	if (process_req == NULL) {
2759 		assert(process->window_remaining > 0);
2760 		return 0;
2761 	}
2762 
2763 	process_req->target = process->target;
2764 	process_req->target_ch = process->raid_ch->process.target_ch;
2765 	process_req->offset_blocks = offset_blocks;
2766 	process_req->num_blocks = num_blocks;
2767 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2768 
2769 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2770 	if (ret <= 0) {
2771 		if (ret < 0) {
2772 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2773 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2774 			process->window_status = ret;
2775 		}
2776 		return ret;
2777 	}
2778 
2779 	process_req->num_blocks = ret;
2780 	TAILQ_REMOVE(&process->requests, process_req, link);
2781 
2782 	return ret;
2783 }
2784 
2785 static void
2786 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2787 {
2788 	struct raid_bdev *raid_bdev = process->raid_bdev;
2789 	uint64_t offset = process->window_offset;
2790 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2791 	int ret;
2792 
2793 	while (offset < offset_end) {
2794 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2795 		if (ret <= 0) {
2796 			break;
2797 		}
2798 
2799 		process->window_remaining += ret;
2800 		offset += ret;
2801 	}
2802 
2803 	if (process->window_remaining > 0) {
2804 		process->window_size = process->window_remaining;
2805 	} else {
2806 		raid_bdev_process_finish(process, process->window_status);
2807 	}
2808 }
2809 
2810 static void
2811 raid_bdev_process_window_range_locked(void *ctx, int status)
2812 {
2813 	struct raid_bdev_process *process = ctx;
2814 
2815 	if (status != 0) {
2816 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2817 		raid_bdev_process_finish(process, status);
2818 		return;
2819 	}
2820 
2821 	process->window_range_locked = true;
2822 
2823 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2824 		raid_bdev_process_unlock_window_range(process);
2825 		return;
2826 	}
2827 
2828 	_raid_bdev_process_thread_run(process);
2829 }
2830 
2831 static bool
2832 raid_bdev_process_consume_token(struct raid_bdev_process *process)
2833 {
2834 	struct raid_bdev *raid_bdev = process->raid_bdev;
2835 	uint64_t now = spdk_get_ticks();
2836 
2837 	process->qos.bytes_available = spdk_min(process->qos.bytes_max,
2838 						process->qos.bytes_available +
2839 						(now - process->qos.last_tsc) * process->qos.bytes_per_tsc);
2840 	process->qos.last_tsc = now;
2841 	if (process->qos.bytes_available > 0.0) {
2842 		process->qos.bytes_available -= process->window_size * raid_bdev->bdev.blocklen;
2843 		return true;
2844 	}
2845 	return false;
2846 }
2847 
2848 static bool
2849 raid_bdev_process_lock_window_range(struct raid_bdev_process *process)
2850 {
2851 	struct raid_bdev *raid_bdev = process->raid_bdev;
2852 	int rc;
2853 
2854 	assert(process->window_range_locked == false);
2855 
2856 	if (process->qos.enable_qos) {
2857 		if (raid_bdev_process_consume_token(process)) {
2858 			spdk_poller_pause(process->qos.process_continue_poller);
2859 		} else {
2860 			spdk_poller_resume(process->qos.process_continue_poller);
2861 			return false;
2862 		}
2863 	}
2864 
2865 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2866 				     process->window_offset, process->max_window_size,
2867 				     raid_bdev_process_window_range_locked, process);
2868 	if (rc != 0) {
2869 		raid_bdev_process_window_range_locked(process, rc);
2870 	}
2871 	return true;
2872 }
2873 
2874 static int
2875 raid_bdev_process_continue_poll(void *arg)
2876 {
2877 	struct raid_bdev_process *process = arg;
2878 
2879 	if (raid_bdev_process_lock_window_range(process)) {
2880 		return SPDK_POLLER_BUSY;
2881 	}
2882 	return SPDK_POLLER_IDLE;
2883 }
2884 
2885 static void
2886 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2887 {
2888 	struct raid_bdev *raid_bdev = process->raid_bdev;
2889 
2890 	assert(spdk_get_thread() == process->thread);
2891 	assert(process->window_remaining == 0);
2892 	assert(process->window_range_locked == false);
2893 
2894 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2895 		raid_bdev_process_do_finish(process);
2896 		return;
2897 	}
2898 
2899 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2900 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2901 		raid_bdev_process_finish(process, 0);
2902 		return;
2903 	}
2904 
2905 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2906 					    process->max_window_size);
2907 	raid_bdev_process_lock_window_range(process);
2908 }
2909 
2910 static void
2911 raid_bdev_process_thread_init(void *ctx)
2912 {
2913 	struct raid_bdev_process *process = ctx;
2914 	struct raid_bdev *raid_bdev = process->raid_bdev;
2915 	struct spdk_io_channel *ch;
2916 
2917 	process->thread = spdk_get_thread();
2918 
2919 	ch = spdk_get_io_channel(raid_bdev);
2920 	if (ch == NULL) {
2921 		process->status = -ENOMEM;
2922 		raid_bdev_process_do_finish(process);
2923 		return;
2924 	}
2925 
2926 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2927 	process->state = RAID_PROCESS_STATE_RUNNING;
2928 
2929 	if (process->qos.enable_qos) {
2930 		process->qos.process_continue_poller = SPDK_POLLER_REGISTER(raid_bdev_process_continue_poll,
2931 						       process, 0);
2932 		spdk_poller_pause(process->qos.process_continue_poller);
2933 	}
2934 
2935 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2936 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2937 
2938 	raid_bdev_process_thread_run(process);
2939 }
2940 
2941 static void
2942 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2943 {
2944 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2945 
2946 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2947 	raid_bdev_process_free(process);
2948 
2949 	/* TODO: update sb */
2950 }
2951 
2952 static void
2953 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2954 {
2955 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2956 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2957 
2958 	raid_bdev_ch_process_cleanup(raid_ch);
2959 
2960 	spdk_for_each_channel_continue(i, 0);
2961 }
2962 
2963 static void
2964 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2965 {
2966 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2967 	struct raid_bdev *raid_bdev = process->raid_bdev;
2968 	struct spdk_thread *thread;
2969 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2970 
2971 	if (status == 0 &&
2972 	    (process->target->remove_scheduled || !process->target->is_configured ||
2973 	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2974 		/* a base bdev was removed before we got here */
2975 		status = -ENODEV;
2976 	}
2977 
2978 	if (status != 0) {
2979 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2980 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2981 			    spdk_strerror(-status));
2982 		goto err;
2983 	}
2984 
2985 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2986 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2987 
2988 	thread = spdk_thread_create(thread_name, NULL);
2989 	if (thread == NULL) {
2990 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2991 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2992 		goto err;
2993 	}
2994 
2995 	raid_bdev->process = process;
2996 
2997 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2998 
2999 	return;
3000 err:
3001 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
3002 			      raid_bdev_channels_abort_start_process_done);
3003 }
3004 
3005 static void
3006 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
3007 {
3008 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
3009 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3010 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
3011 	int rc;
3012 
3013 	rc = raid_bdev_ch_process_setup(raid_ch, process);
3014 
3015 	spdk_for_each_channel_continue(i, rc);
3016 }
3017 
3018 static void
3019 raid_bdev_process_start(struct raid_bdev_process *process)
3020 {
3021 	struct raid_bdev *raid_bdev = process->raid_bdev;
3022 
3023 	assert(raid_bdev->module->submit_process_request != NULL);
3024 
3025 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
3026 			      raid_bdev_channels_start_process_done);
3027 }
3028 
3029 static void
3030 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
3031 {
3032 	spdk_dma_free(process_req->iov.iov_base);
3033 	spdk_dma_free(process_req->md_buf);
3034 	free(process_req);
3035 }
3036 
3037 static struct raid_bdev_process_request *
3038 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
3039 {
3040 	struct raid_bdev *raid_bdev = process->raid_bdev;
3041 	struct raid_bdev_process_request *process_req;
3042 
3043 	process_req = calloc(1, sizeof(*process_req));
3044 	if (process_req == NULL) {
3045 		return NULL;
3046 	}
3047 
3048 	process_req->process = process;
3049 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
3050 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
3051 	if (process_req->iov.iov_base == NULL) {
3052 		free(process_req);
3053 		return NULL;
3054 	}
3055 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
3056 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
3057 		if (process_req->md_buf == NULL) {
3058 			raid_bdev_process_request_free(process_req);
3059 			return NULL;
3060 		}
3061 	}
3062 
3063 	return process_req;
3064 }
3065 
3066 static void
3067 raid_bdev_process_free(struct raid_bdev_process *process)
3068 {
3069 	struct raid_bdev_process_request *process_req;
3070 
3071 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
3072 		TAILQ_REMOVE(&process->requests, process_req, link);
3073 		raid_bdev_process_request_free(process_req);
3074 	}
3075 
3076 	free(process);
3077 }
3078 
3079 static struct raid_bdev_process *
3080 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
3081 			struct raid_base_bdev_info *target)
3082 {
3083 	struct raid_bdev_process *process;
3084 	struct raid_bdev_process_request *process_req;
3085 	int i;
3086 
3087 	process = calloc(1, sizeof(*process));
3088 	if (process == NULL) {
3089 		return NULL;
3090 	}
3091 
3092 	process->raid_bdev = raid_bdev;
3093 	process->type = type;
3094 	process->target = target;
3095 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
3096 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
3097 					    raid_bdev->bdev.write_unit_size);
3098 	TAILQ_INIT(&process->requests);
3099 	TAILQ_INIT(&process->finish_actions);
3100 
3101 	if (g_opts.process_max_bandwidth_mb_sec != 0) {
3102 		process->qos.enable_qos = true;
3103 		process->qos.last_tsc = spdk_get_ticks();
3104 		process->qos.bytes_per_tsc = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 /
3105 					     spdk_get_ticks_hz();
3106 		process->qos.bytes_max = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 / SPDK_SEC_TO_MSEC;
3107 		process->qos.bytes_available = 0.0;
3108 	}
3109 
3110 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
3111 		process_req = raid_bdev_process_alloc_request(process);
3112 		if (process_req == NULL) {
3113 			raid_bdev_process_free(process);
3114 			return NULL;
3115 		}
3116 
3117 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
3118 	}
3119 
3120 	return process;
3121 }
3122 
3123 static int
3124 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
3125 {
3126 	struct raid_bdev_process *process;
3127 
3128 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3129 
3130 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
3131 	if (process == NULL) {
3132 		return -ENOMEM;
3133 	}
3134 
3135 	raid_bdev_process_start(process);
3136 
3137 	return 0;
3138 }
3139 
3140 static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
3141 
3142 static void
3143 _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
3144 {
3145 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
3146 
3147 	raid_bdev_configure_base_bdev_cont(base_info);
3148 }
3149 
3150 static void
3151 raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3152 {
3153 	spdk_for_each_channel_continue(i, 0);
3154 }
3155 
3156 static void
3157 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3158 {
3159 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3160 	raid_base_bdev_cb configure_cb;
3161 	int rc;
3162 
3163 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3164 	    base_info->is_process_target == false) {
3165 		/* TODO: defer if rebuild in progress on another base bdev */
3166 		assert(raid_bdev->process == NULL);
3167 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3168 		base_info->is_process_target = true;
3169 		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3170 		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3171 		return;
3172 	}
3173 
3174 	base_info->is_configured = true;
3175 
3176 	raid_bdev->num_base_bdevs_discovered++;
3177 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3178 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3179 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3180 
3181 	configure_cb = base_info->configure_cb;
3182 	base_info->configure_cb = NULL;
3183 	/*
3184 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3185 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3186 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3187 	 * degraded.
3188 	 */
3189 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3190 		rc = raid_bdev_configure(raid_bdev, configure_cb, base_info->configure_cb_ctx);
3191 		if (rc != 0) {
3192 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3193 		} else {
3194 			configure_cb = NULL;
3195 		}
3196 	} else if (base_info->is_process_target) {
3197 		raid_bdev->num_base_bdevs_operational++;
3198 		rc = raid_bdev_start_rebuild(base_info);
3199 		if (rc != 0) {
3200 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3201 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3202 		}
3203 	} else {
3204 		rc = 0;
3205 	}
3206 
3207 	if (configure_cb != NULL) {
3208 		configure_cb(base_info->configure_cb_ctx, rc);
3209 	}
3210 }
3211 
3212 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3213 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3214 
3215 static void
3216 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3217 		void *ctx)
3218 {
3219 	struct raid_base_bdev_info *base_info = ctx;
3220 	raid_base_bdev_cb configure_cb = base_info->configure_cb;
3221 
3222 	switch (status) {
3223 	case 0:
3224 		/* valid superblock found */
3225 		base_info->configure_cb = NULL;
3226 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3227 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3228 
3229 			raid_bdev_free_base_bdev_resource(base_info);
3230 			raid_bdev_examine_sb(sb, bdev, configure_cb, base_info->configure_cb_ctx);
3231 			return;
3232 		}
3233 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3234 		status = -EEXIST;
3235 		raid_bdev_free_base_bdev_resource(base_info);
3236 		break;
3237 	case -EINVAL:
3238 		/* no valid superblock */
3239 		raid_bdev_configure_base_bdev_cont(base_info);
3240 		return;
3241 	default:
3242 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3243 			    base_info->name, spdk_strerror(-status));
3244 		break;
3245 	}
3246 
3247 	if (configure_cb != NULL) {
3248 		base_info->configure_cb = NULL;
3249 		configure_cb(base_info->configure_cb_ctx, status);
3250 	}
3251 }
3252 
3253 static int
3254 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3255 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3256 {
3257 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3258 	struct spdk_bdev_desc *desc;
3259 	struct spdk_bdev *bdev;
3260 	const struct spdk_uuid *bdev_uuid;
3261 	int rc;
3262 
3263 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3264 	assert(base_info->desc == NULL);
3265 
3266 	/*
3267 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3268 	 * before claiming the bdev.
3269 	 */
3270 
3271 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3272 		char uuid_str[SPDK_UUID_STRING_LEN];
3273 		const char *bdev_name;
3274 
3275 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3276 
3277 		/* UUID of a bdev is registered as its alias */
3278 		bdev = spdk_bdev_get_by_name(uuid_str);
3279 		if (bdev == NULL) {
3280 			return -ENODEV;
3281 		}
3282 
3283 		bdev_name = spdk_bdev_get_name(bdev);
3284 
3285 		if (base_info->name == NULL) {
3286 			assert(existing == true);
3287 			base_info->name = strdup(bdev_name);
3288 			if (base_info->name == NULL) {
3289 				return -ENOMEM;
3290 			}
3291 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3292 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3293 				    bdev_name, base_info->name);
3294 			return -EINVAL;
3295 		}
3296 	}
3297 
3298 	assert(base_info->name != NULL);
3299 
3300 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3301 	if (rc != 0) {
3302 		if (rc != -ENODEV) {
3303 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3304 		}
3305 		return rc;
3306 	}
3307 
3308 	bdev = spdk_bdev_desc_get_bdev(desc);
3309 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3310 
3311 	if (spdk_uuid_is_null(&base_info->uuid)) {
3312 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3313 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3314 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3315 		spdk_bdev_close(desc);
3316 		return -EINVAL;
3317 	}
3318 
3319 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3320 	if (rc != 0) {
3321 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3322 		spdk_bdev_close(desc);
3323 		return rc;
3324 	}
3325 
3326 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3327 
3328 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3329 	if (base_info->app_thread_ch == NULL) {
3330 		SPDK_ERRLOG("Failed to get io channel\n");
3331 		spdk_bdev_module_release_bdev(bdev);
3332 		spdk_bdev_close(desc);
3333 		return -ENOMEM;
3334 	}
3335 
3336 	base_info->desc = desc;
3337 	base_info->blockcnt = bdev->blockcnt;
3338 
3339 	if (raid_bdev->superblock_enabled) {
3340 		uint64_t data_offset;
3341 
3342 		if (base_info->data_offset == 0) {
3343 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3344 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3345 		} else {
3346 			data_offset = base_info->data_offset;
3347 		}
3348 
3349 		if (bdev->optimal_io_boundary != 0) {
3350 			data_offset = spdk_divide_round_up(data_offset,
3351 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3352 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3353 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3354 					     base_info->data_offset, base_info->name, data_offset);
3355 				data_offset = base_info->data_offset;
3356 			}
3357 		}
3358 
3359 		base_info->data_offset = data_offset;
3360 	}
3361 
3362 	if (base_info->data_offset >= bdev->blockcnt) {
3363 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3364 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3365 		rc = -EINVAL;
3366 		goto out;
3367 	}
3368 
3369 	if (base_info->data_size == 0) {
3370 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3371 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3372 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3373 			    bdev->blockcnt, base_info->name);
3374 		rc = -EINVAL;
3375 		goto out;
3376 	}
3377 
3378 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3379 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3380 			    bdev->name);
3381 		rc = -EINVAL;
3382 		goto out;
3383 	}
3384 
3385 	/*
3386 	 * Set the raid bdev properties if this is the first base bdev configured,
3387 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3388 	 * have the same blocklen and metadata format.
3389 	 */
3390 	if (raid_bdev->bdev.blocklen == 0) {
3391 		raid_bdev->bdev.blocklen = bdev->blocklen;
3392 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3393 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3394 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3395 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3396 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3397 		raid_bdev->bdev.dif_pi_format = bdev->dif_pi_format;
3398 	} else {
3399 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3400 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3401 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3402 			rc = -EINVAL;
3403 			goto out;
3404 		}
3405 
3406 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3407 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3408 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3409 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3410 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev) ||
3411 		    raid_bdev->bdev.dif_pi_format != bdev->dif_pi_format) {
3412 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3413 				    raid_bdev->bdev.name, bdev->name);
3414 			rc = -EINVAL;
3415 			goto out;
3416 		}
3417 	}
3418 
3419 	assert(base_info->configure_cb == NULL);
3420 	base_info->configure_cb = cb_fn;
3421 	base_info->configure_cb_ctx = cb_ctx;
3422 
3423 	if (existing) {
3424 		raid_bdev_configure_base_bdev_cont(base_info);
3425 	} else {
3426 		/* check for existing superblock when using a new bdev */
3427 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3428 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3429 		if (rc) {
3430 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3431 				    bdev->name, spdk_strerror(-rc));
3432 		}
3433 	}
3434 out:
3435 	if (rc != 0) {
3436 		base_info->configure_cb = NULL;
3437 		raid_bdev_free_base_bdev_resource(base_info);
3438 	}
3439 	return rc;
3440 }
3441 
3442 int
3443 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3444 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3445 {
3446 	struct raid_base_bdev_info *base_info = NULL, *iter;
3447 	int rc;
3448 
3449 	assert(name != NULL);
3450 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3451 
3452 	if (raid_bdev->process != NULL) {
3453 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3454 			    raid_bdev->bdev.name);
3455 		return -EPERM;
3456 	}
3457 
3458 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3459 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3460 
3461 		if (bdev != NULL) {
3462 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3463 				if (iter->name == NULL &&
3464 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3465 					base_info = iter;
3466 					break;
3467 				}
3468 			}
3469 		}
3470 	}
3471 
3472 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3473 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3474 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3475 				base_info = iter;
3476 				break;
3477 			}
3478 		}
3479 	}
3480 
3481 	if (base_info == NULL) {
3482 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3483 			    raid_bdev->bdev.name, name);
3484 		return -EINVAL;
3485 	}
3486 
3487 	assert(base_info->is_configured == false);
3488 
3489 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3490 		assert(base_info->data_size != 0);
3491 		assert(base_info->desc == NULL);
3492 	}
3493 
3494 	base_info->name = strdup(name);
3495 	if (base_info->name == NULL) {
3496 		return -ENOMEM;
3497 	}
3498 
3499 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3500 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3501 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3502 		free(base_info->name);
3503 		base_info->name = NULL;
3504 	}
3505 
3506 	return rc;
3507 }
3508 
3509 static int
3510 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3511 {
3512 	struct raid_bdev *raid_bdev;
3513 	uint8_t i;
3514 	int rc;
3515 
3516 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3517 			       sb->level, true, &sb->uuid, &raid_bdev);
3518 	if (rc != 0) {
3519 		return rc;
3520 	}
3521 
3522 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3523 	if (rc != 0) {
3524 		raid_bdev_free(raid_bdev);
3525 		return rc;
3526 	}
3527 
3528 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3529 	memcpy(raid_bdev->sb, sb, sb->length);
3530 
3531 	for (i = 0; i < sb->base_bdevs_size; i++) {
3532 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3533 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3534 
3535 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3536 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3537 			raid_bdev->num_base_bdevs_operational++;
3538 		}
3539 
3540 		base_info->data_offset = sb_base_bdev->data_offset;
3541 		base_info->data_size = sb_base_bdev->data_size;
3542 	}
3543 
3544 	*raid_bdev_out = raid_bdev;
3545 	return 0;
3546 }
3547 
3548 static void
3549 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3550 {
3551 	struct raid_bdev *raid_bdev;
3552 	struct raid_base_bdev_info *base_info;
3553 
3554 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3555 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3556 			continue;
3557 		}
3558 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3559 			if (base_info->desc == NULL &&
3560 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3561 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3562 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3563 				break;
3564 			}
3565 		}
3566 	}
3567 }
3568 
3569 struct raid_bdev_examine_others_ctx {
3570 	struct spdk_uuid raid_bdev_uuid;
3571 	uint8_t current_base_bdev_idx;
3572 	raid_base_bdev_cb cb_fn;
3573 	void *cb_ctx;
3574 };
3575 
3576 static void
3577 raid_bdev_examine_others_done(void *_ctx, int status)
3578 {
3579 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3580 
3581 	if (ctx->cb_fn != NULL) {
3582 		ctx->cb_fn(ctx->cb_ctx, status);
3583 	}
3584 	free(ctx);
3585 }
3586 
3587 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3588 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3589 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3590 				     void *cb_ctx);
3591 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3592 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3593 static void raid_bdev_examine_others(void *_ctx, int status);
3594 
3595 static void
3596 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3597 				 int status, void *_ctx)
3598 {
3599 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3600 
3601 	if (status != 0) {
3602 		raid_bdev_examine_others_done(ctx, status);
3603 		return;
3604 	}
3605 
3606 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3607 }
3608 
3609 static void
3610 raid_bdev_examine_others(void *_ctx, int status)
3611 {
3612 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3613 	struct raid_bdev *raid_bdev;
3614 	struct raid_base_bdev_info *base_info;
3615 	char uuid_str[SPDK_UUID_STRING_LEN];
3616 
3617 	if (status != 0 && status != -EEXIST) {
3618 		goto out;
3619 	}
3620 
3621 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3622 	if (raid_bdev == NULL) {
3623 		status = -ENODEV;
3624 		goto out;
3625 	}
3626 
3627 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3628 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3629 	     base_info++) {
3630 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3631 			continue;
3632 		}
3633 
3634 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3635 
3636 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3637 			continue;
3638 		}
3639 
3640 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3641 
3642 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3643 		if (status != 0) {
3644 			continue;
3645 		}
3646 		return;
3647 	}
3648 out:
3649 	raid_bdev_examine_others_done(ctx, status);
3650 }
3651 
3652 static void
3653 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3654 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3655 {
3656 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3657 	struct raid_bdev *raid_bdev;
3658 	struct raid_base_bdev_info *iter, *base_info;
3659 	uint8_t i;
3660 	int rc;
3661 
3662 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3663 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3664 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3665 		rc = -EINVAL;
3666 		goto out;
3667 	}
3668 
3669 	if (spdk_uuid_is_null(&sb->uuid)) {
3670 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3671 		rc = -EINVAL;
3672 		goto out;
3673 	}
3674 
3675 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3676 
3677 	if (raid_bdev) {
3678 		if (raid_bdev->sb == NULL) {
3679 			SPDK_WARNLOG("raid superblock is null\n");
3680 			rc = -EINVAL;
3681 			goto out;
3682 		}
3683 
3684 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3685 			SPDK_DEBUGLOG(bdev_raid,
3686 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3687 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3688 
3689 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3690 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3691 					     raid_bdev->bdev.name, bdev->name);
3692 				rc = -EBUSY;
3693 				goto out;
3694 			}
3695 
3696 			/* remove and then recreate the raid bdev using the newer superblock */
3697 			raid_bdev_delete(raid_bdev, NULL, NULL);
3698 			raid_bdev = NULL;
3699 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3700 			SPDK_DEBUGLOG(bdev_raid,
3701 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3702 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3703 			/* use the current raid bdev superblock */
3704 			sb = raid_bdev->sb;
3705 		}
3706 	}
3707 
3708 	for (i = 0; i < sb->base_bdevs_size; i++) {
3709 		sb_base_bdev = &sb->base_bdevs[i];
3710 
3711 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3712 
3713 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3714 			break;
3715 		}
3716 	}
3717 
3718 	if (i == sb->base_bdevs_size) {
3719 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3720 		rc = -EINVAL;
3721 		goto out;
3722 	}
3723 
3724 	if (!raid_bdev) {
3725 		struct raid_bdev_examine_others_ctx *ctx;
3726 
3727 		ctx = calloc(1, sizeof(*ctx));
3728 		if (ctx == NULL) {
3729 			rc = -ENOMEM;
3730 			goto out;
3731 		}
3732 
3733 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3734 		if (rc != 0) {
3735 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3736 				    sb->name, spdk_strerror(-rc));
3737 			free(ctx);
3738 			goto out;
3739 		}
3740 
3741 		/* after this base bdev is configured, examine other base bdevs that may be present */
3742 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3743 		ctx->cb_fn = cb_fn;
3744 		ctx->cb_ctx = cb_ctx;
3745 
3746 		cb_fn = raid_bdev_examine_others;
3747 		cb_ctx = ctx;
3748 	}
3749 
3750 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3751 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3752 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3753 		assert(base_info->is_configured == false);
3754 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3755 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3756 		assert(spdk_uuid_is_null(&base_info->uuid));
3757 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3758 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3759 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3760 		if (rc != 0) {
3761 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3762 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3763 		}
3764 		goto out;
3765 	}
3766 
3767 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3768 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3769 			       bdev->name, raid_bdev->bdev.name);
3770 		rc = -EINVAL;
3771 		goto out;
3772 	}
3773 
3774 	base_info = NULL;
3775 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3776 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3777 			base_info = iter;
3778 			break;
3779 		}
3780 	}
3781 
3782 	if (base_info == NULL) {
3783 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3784 			    bdev->name, raid_bdev->bdev.name);
3785 		rc = -EINVAL;
3786 		goto out;
3787 	}
3788 
3789 	if (base_info->is_configured) {
3790 		rc = -EEXIST;
3791 		goto out;
3792 	}
3793 
3794 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3795 	if (rc != 0) {
3796 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3797 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3798 	}
3799 out:
3800 	if (rc != 0 && cb_fn != 0) {
3801 		cb_fn(cb_ctx, rc);
3802 	}
3803 }
3804 
3805 struct raid_bdev_examine_ctx {
3806 	struct spdk_bdev_desc *desc;
3807 	struct spdk_io_channel *ch;
3808 	raid_bdev_examine_load_sb_cb cb;
3809 	void *cb_ctx;
3810 };
3811 
3812 static void
3813 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3814 {
3815 	if (!ctx) {
3816 		return;
3817 	}
3818 
3819 	if (ctx->ch) {
3820 		spdk_put_io_channel(ctx->ch);
3821 	}
3822 
3823 	if (ctx->desc) {
3824 		spdk_bdev_close(ctx->desc);
3825 	}
3826 
3827 	free(ctx);
3828 }
3829 
3830 static void
3831 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3832 {
3833 	struct raid_bdev_examine_ctx *ctx = _ctx;
3834 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3835 
3836 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3837 
3838 	raid_bdev_examine_ctx_free(ctx);
3839 }
3840 
3841 static void
3842 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3843 {
3844 }
3845 
3846 static int
3847 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3848 {
3849 	struct raid_bdev_examine_ctx *ctx;
3850 	int rc;
3851 
3852 	assert(cb != NULL);
3853 
3854 	ctx = calloc(1, sizeof(*ctx));
3855 	if (!ctx) {
3856 		return -ENOMEM;
3857 	}
3858 
3859 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3860 	if (rc) {
3861 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3862 		goto err;
3863 	}
3864 
3865 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3866 	if (!ctx->ch) {
3867 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3868 		rc = -ENOMEM;
3869 		goto err;
3870 	}
3871 
3872 	ctx->cb = cb;
3873 	ctx->cb_ctx = cb_ctx;
3874 
3875 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3876 	if (rc) {
3877 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3878 			    bdev_name, spdk_strerror(-rc));
3879 		goto err;
3880 	}
3881 
3882 	return 0;
3883 err:
3884 	raid_bdev_examine_ctx_free(ctx);
3885 	return rc;
3886 }
3887 
3888 static void
3889 raid_bdev_examine_done(void *ctx, int status)
3890 {
3891 	struct spdk_bdev *bdev = ctx;
3892 
3893 	if (status != 0) {
3894 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3895 			    bdev->name, spdk_strerror(-status));
3896 	}
3897 	spdk_bdev_module_examine_done(&g_raid_if);
3898 }
3899 
3900 static void
3901 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3902 		       void *ctx)
3903 {
3904 	switch (status) {
3905 	case 0:
3906 		/* valid superblock found */
3907 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3908 		raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_done, bdev);
3909 		return;
3910 	case -EINVAL:
3911 		/* no valid superblock, check if it can be claimed anyway */
3912 		raid_bdev_examine_no_sb(bdev);
3913 		status = 0;
3914 		break;
3915 	}
3916 
3917 	raid_bdev_examine_done(bdev, status);
3918 }
3919 
3920 /*
3921  * brief:
3922  * raid_bdev_examine function is the examine function call by the below layers
3923  * like bdev_nvme layer. This function will check if this base bdev can be
3924  * claimed by this raid bdev or not.
3925  * params:
3926  * bdev - pointer to base bdev
3927  * returns:
3928  * none
3929  */
3930 static void
3931 raid_bdev_examine(struct spdk_bdev *bdev)
3932 {
3933 	int rc = 0;
3934 
3935 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3936 		goto done;
3937 	}
3938 
3939 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3940 		raid_bdev_examine_no_sb(bdev);
3941 		goto done;
3942 	}
3943 
3944 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3945 	if (rc != 0) {
3946 		goto done;
3947 	}
3948 
3949 	return;
3950 done:
3951 	raid_bdev_examine_done(bdev, rc);
3952 }
3953 
3954 /* Log component for bdev raid bdev module */
3955 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3956 
3957 static void
3958 bdev_raid_trace(void)
3959 {
3960 	struct spdk_trace_tpoint_opts opts[] = {
3961 		{
3962 			"BDEV_RAID_IO_START", TRACE_BDEV_RAID_IO_START,
3963 			OWNER_TYPE_NONE, OBJECT_BDEV_RAID_IO, 1,
3964 			{{ "ctx", SPDK_TRACE_ARG_TYPE_PTR, 8 }}
3965 		},
3966 		{
3967 			"BDEV_RAID_IO_DONE", TRACE_BDEV_RAID_IO_DONE,
3968 			OWNER_TYPE_NONE, OBJECT_BDEV_RAID_IO, 0,
3969 			{{ "ctx", SPDK_TRACE_ARG_TYPE_PTR, 8 }}
3970 		}
3971 	};
3972 
3973 
3974 	spdk_trace_register_object(OBJECT_BDEV_RAID_IO, 'R');
3975 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
3976 	spdk_trace_tpoint_register_relation(TRACE_BDEV_IO_START, OBJECT_BDEV_RAID_IO, 1);
3977 	spdk_trace_tpoint_register_relation(TRACE_BDEV_IO_DONE, OBJECT_BDEV_RAID_IO, 0);
3978 }
3979 SPDK_TRACE_REGISTER_FN(bdev_raid_trace, "bdev_raid", TRACE_GROUP_BDEV_RAID)
3980