xref: /spdk/module/bdev/delay/vbdev_delay.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "vbdev_delay.h"
10 #include "spdk/rpc.h"
11 #include "spdk/env.h"
12 #include "spdk/endian.h"
13 #include "spdk/string.h"
14 #include "spdk/thread.h"
15 #include "spdk/util.h"
16 
17 #include "spdk/bdev_module.h"
18 #include "spdk/log.h"
19 
20 /* This namespace UUID was generated using uuid_generate() method. */
21 #define BDEV_DELAY_NAMESPACE_UUID "4009b574-6430-4f1b-bc40-ace811091027"
22 
23 static int vbdev_delay_init(void);
24 static int vbdev_delay_get_ctx_size(void);
25 static void vbdev_delay_examine(struct spdk_bdev *bdev);
26 static void vbdev_delay_finish(void);
27 static int vbdev_delay_config_json(struct spdk_json_write_ctx *w);
28 
29 static struct spdk_bdev_module delay_if = {
30 	.name = "delay",
31 	.module_init = vbdev_delay_init,
32 	.get_ctx_size = vbdev_delay_get_ctx_size,
33 	.examine_config = vbdev_delay_examine,
34 	.module_fini = vbdev_delay_finish,
35 	.config_json = vbdev_delay_config_json
36 };
37 
38 SPDK_BDEV_MODULE_REGISTER(delay, &delay_if)
39 
40 /* Associative list to be used in examine */
41 struct bdev_association {
42 	char			*vbdev_name;
43 	char			*bdev_name;
44 	struct spdk_uuid	uuid;
45 	uint64_t		avg_read_latency;
46 	uint64_t		p99_read_latency;
47 	uint64_t		avg_write_latency;
48 	uint64_t		p99_write_latency;
49 	TAILQ_ENTRY(bdev_association)	link;
50 };
51 static TAILQ_HEAD(, bdev_association) g_bdev_associations = TAILQ_HEAD_INITIALIZER(
52 			g_bdev_associations);
53 
54 /* List of virtual bdevs and associated info for each. */
55 struct vbdev_delay {
56 	struct spdk_bdev		*base_bdev; /* the thing we're attaching to */
57 	struct spdk_bdev_desc		*base_desc; /* its descriptor we get from open */
58 	struct spdk_bdev		delay_bdev;    /* the delay virtual bdev */
59 	uint64_t			average_read_latency_ticks; /* the average read delay */
60 	uint64_t			p99_read_latency_ticks; /* the p99 read delay */
61 	uint64_t			average_write_latency_ticks; /* the average write delay */
62 	uint64_t			p99_write_latency_ticks; /* the p99 write delay */
63 	TAILQ_ENTRY(vbdev_delay)	link;
64 	struct spdk_thread		*thread;    /* thread where base device is opened */
65 };
66 static TAILQ_HEAD(, vbdev_delay) g_delay_nodes = TAILQ_HEAD_INITIALIZER(g_delay_nodes);
67 
68 struct delay_bdev_io {
69 	int status;
70 
71 	uint64_t completion_tick;
72 
73 	enum delay_io_type type;
74 
75 	struct spdk_io_channel *ch;
76 
77 	struct spdk_bdev_io_wait_entry bdev_io_wait;
78 
79 	struct spdk_bdev_io *zcopy_bdev_io;
80 
81 	STAILQ_ENTRY(delay_bdev_io) link;
82 };
83 
84 struct delay_io_channel {
85 	struct spdk_io_channel	*base_ch; /* IO channel of base device */
86 	STAILQ_HEAD(, delay_bdev_io) avg_read_io;
87 	STAILQ_HEAD(, delay_bdev_io) p99_read_io;
88 	STAILQ_HEAD(, delay_bdev_io) avg_write_io;
89 	STAILQ_HEAD(, delay_bdev_io) p99_write_io;
90 	struct spdk_poller *io_poller;
91 	unsigned int rand_seed;
92 };
93 
94 static void vbdev_delay_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
95 
96 
97 /* Callback for unregistering the IO device. */
98 static void
99 _device_unregister_cb(void *io_device)
100 {
101 	struct vbdev_delay *delay_node  = io_device;
102 
103 	/* Done with this delay_node. */
104 	free(delay_node->delay_bdev.name);
105 	free(delay_node);
106 }
107 
108 static void
109 _vbdev_delay_destruct(void *ctx)
110 {
111 	struct spdk_bdev_desc *desc = ctx;
112 
113 	spdk_bdev_close(desc);
114 }
115 
116 static int
117 vbdev_delay_destruct(void *ctx)
118 {
119 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
120 
121 	/* It is important to follow this exact sequence of steps for destroying
122 	 * a vbdev...
123 	 */
124 
125 	TAILQ_REMOVE(&g_delay_nodes, delay_node, link);
126 
127 	/* Unclaim the underlying bdev. */
128 	spdk_bdev_module_release_bdev(delay_node->base_bdev);
129 
130 	/* Close the underlying bdev on its same opened thread. */
131 	if (delay_node->thread && delay_node->thread != spdk_get_thread()) {
132 		spdk_thread_send_msg(delay_node->thread, _vbdev_delay_destruct, delay_node->base_desc);
133 	} else {
134 		spdk_bdev_close(delay_node->base_desc);
135 	}
136 
137 	/* Unregister the io_device. */
138 	spdk_io_device_unregister(delay_node, _device_unregister_cb);
139 
140 	return 0;
141 }
142 
143 static int
144 _process_io_stailq(void *arg, uint64_t ticks)
145 {
146 	STAILQ_HEAD(, delay_bdev_io) *head = arg;
147 	struct delay_bdev_io *io_ctx, *tmp;
148 	int completions = 0;
149 
150 	STAILQ_FOREACH_SAFE(io_ctx, head, link, tmp) {
151 		if (io_ctx->completion_tick <= ticks) {
152 			STAILQ_REMOVE(head, io_ctx, delay_bdev_io, link);
153 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(io_ctx), io_ctx->status);
154 			completions++;
155 		} else {
156 			/* In the general case, I/O will become ready in an fifo order. When timeouts are dynamically
157 			 * changed, this is not necessarily the case. However, the normal behavior will be restored
158 			 * after the outstanding I/O at the time of the change have been completed.
159 			 * This essentially means that moving from a high to low latency creates a dam for the new I/O
160 			 * submitted after the latency change. This is considered desirable behavior for the use case where
161 			 * we are trying to trigger a pre-defined timeout on an initiator.
162 			 */
163 			break;
164 		}
165 	}
166 
167 	return completions;
168 }
169 
170 static int
171 _delay_finish_io(void *arg)
172 {
173 	struct delay_io_channel *delay_ch = arg;
174 	uint64_t ticks = spdk_get_ticks();
175 	int completions = 0;
176 
177 	completions += _process_io_stailq(&delay_ch->avg_read_io, ticks);
178 	completions += _process_io_stailq(&delay_ch->avg_write_io, ticks);
179 	completions += _process_io_stailq(&delay_ch->p99_read_io, ticks);
180 	completions += _process_io_stailq(&delay_ch->p99_write_io, ticks);
181 
182 	return completions == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
183 }
184 
185 /* Completion callback for IO that were issued from this bdev. The original bdev_io
186  * is passed in as an arg so we'll complete that one with the appropriate status
187  * and then free the one that this module issued.
188  */
189 static void
190 _delay_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
191 {
192 	struct spdk_bdev_io *orig_io = cb_arg;
193 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(orig_io->bdev, struct vbdev_delay, delay_bdev);
194 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)orig_io->driver_ctx;
195 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
196 
197 	io_ctx->status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
198 
199 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_ZCOPY && bdev_io->u.bdev.zcopy.start && success) {
200 		io_ctx->zcopy_bdev_io = bdev_io;
201 	} else {
202 		assert(io_ctx->zcopy_bdev_io == NULL || io_ctx->zcopy_bdev_io == bdev_io);
203 		io_ctx->zcopy_bdev_io = NULL;
204 		spdk_bdev_free_io(bdev_io);
205 	}
206 
207 	/* Put the I/O into the proper list for processing by the channel poller. */
208 	switch (io_ctx->type) {
209 	case DELAY_AVG_READ:
210 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->average_read_latency_ticks;
211 		STAILQ_INSERT_TAIL(&delay_ch->avg_read_io, io_ctx, link);
212 		break;
213 	case DELAY_AVG_WRITE:
214 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->average_write_latency_ticks;
215 		STAILQ_INSERT_TAIL(&delay_ch->avg_write_io, io_ctx, link);
216 		break;
217 	case DELAY_P99_READ:
218 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->p99_read_latency_ticks;
219 		STAILQ_INSERT_TAIL(&delay_ch->p99_read_io, io_ctx, link);
220 		break;
221 	case DELAY_P99_WRITE:
222 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->p99_write_latency_ticks;
223 		STAILQ_INSERT_TAIL(&delay_ch->p99_write_io, io_ctx, link);
224 		break;
225 	case DELAY_NONE:
226 	default:
227 		spdk_bdev_io_complete(orig_io, io_ctx->status);
228 		break;
229 	}
230 }
231 
232 static void
233 vbdev_delay_resubmit_io(void *arg)
234 {
235 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
236 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
237 
238 	vbdev_delay_submit_request(io_ctx->ch, bdev_io);
239 }
240 
241 static void
242 vbdev_delay_queue_io(struct spdk_bdev_io *bdev_io)
243 {
244 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
245 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
246 	int rc;
247 
248 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
249 	io_ctx->bdev_io_wait.cb_fn = vbdev_delay_resubmit_io;
250 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
251 
252 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, delay_ch->base_ch, &io_ctx->bdev_io_wait);
253 	if (rc != 0) {
254 		SPDK_ERRLOG("Queue io failed in vbdev_delay_queue_io, rc=%d.\n", rc);
255 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
256 	}
257 }
258 
259 static void
260 delay_init_ext_io_opts(struct spdk_bdev_io *bdev_io, struct spdk_bdev_ext_io_opts *opts)
261 {
262 	memset(opts, 0, sizeof(*opts));
263 	opts->size = sizeof(*opts);
264 	opts->memory_domain = bdev_io->u.bdev.memory_domain;
265 	opts->memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
266 	opts->metadata = bdev_io->u.bdev.md_buf;
267 }
268 
269 static void
270 delay_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
271 {
272 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_delay,
273 					 delay_bdev);
274 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
275 	struct spdk_bdev_ext_io_opts io_opts;
276 	int rc;
277 
278 	if (!success) {
279 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
280 		return;
281 	}
282 
283 	delay_init_ext_io_opts(bdev_io, &io_opts);
284 	rc = spdk_bdev_readv_blocks_ext(delay_node->base_desc, delay_ch->base_ch, bdev_io->u.bdev.iovs,
285 					bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
286 					bdev_io->u.bdev.num_blocks, _delay_complete_io,
287 					bdev_io, &io_opts);
288 
289 	if (rc == -ENOMEM) {
290 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
291 		vbdev_delay_queue_io(bdev_io);
292 	} else if (rc != 0) {
293 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
294 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
295 	}
296 }
297 
298 static void
299 vbdev_delay_reset_dev(struct spdk_io_channel_iter *i, int status)
300 {
301 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
302 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
303 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
304 	struct vbdev_delay *delay_node = spdk_io_channel_iter_get_io_device(i);
305 	int rc;
306 
307 	rc = spdk_bdev_reset(delay_node->base_desc, delay_ch->base_ch,
308 			     _delay_complete_io, bdev_io);
309 
310 	if (rc == -ENOMEM) {
311 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
312 		vbdev_delay_queue_io(bdev_io);
313 	} else if (rc != 0) {
314 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
315 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
316 	}
317 }
318 
319 static void
320 abort_zcopy_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
321 {
322 	spdk_bdev_free_io(bdev_io);
323 }
324 
325 static void
326 _abort_all_delayed_io(void *arg)
327 {
328 	STAILQ_HEAD(, delay_bdev_io) *head = arg;
329 	struct delay_bdev_io *io_ctx, *tmp;
330 
331 	STAILQ_FOREACH_SAFE(io_ctx, head, link, tmp) {
332 		STAILQ_REMOVE(head, io_ctx, delay_bdev_io, link);
333 		if (io_ctx->zcopy_bdev_io != NULL) {
334 			spdk_bdev_zcopy_end(io_ctx->zcopy_bdev_io, false, abort_zcopy_io, NULL);
335 		}
336 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(io_ctx), SPDK_BDEV_IO_STATUS_ABORTED);
337 	}
338 }
339 
340 static void
341 vbdev_delay_reset_channel(struct spdk_io_channel_iter *i)
342 {
343 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
344 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
345 
346 	_abort_all_delayed_io(&delay_ch->avg_read_io);
347 	_abort_all_delayed_io(&delay_ch->avg_write_io);
348 	_abort_all_delayed_io(&delay_ch->p99_read_io);
349 	_abort_all_delayed_io(&delay_ch->p99_write_io);
350 
351 	spdk_for_each_channel_continue(i, 0);
352 }
353 
354 static bool
355 abort_delayed_io(void *_head, struct spdk_bdev_io *bio_to_abort)
356 {
357 	STAILQ_HEAD(, delay_bdev_io) *head = _head;
358 	struct delay_bdev_io *io_ctx_to_abort = (struct delay_bdev_io *)bio_to_abort->driver_ctx;
359 	struct delay_bdev_io *io_ctx;
360 
361 	STAILQ_FOREACH(io_ctx, head, link) {
362 		if (io_ctx == io_ctx_to_abort) {
363 			STAILQ_REMOVE(head, io_ctx_to_abort, delay_bdev_io, link);
364 			if (io_ctx->zcopy_bdev_io != NULL) {
365 				spdk_bdev_zcopy_end(io_ctx->zcopy_bdev_io, false, abort_zcopy_io, NULL);
366 			}
367 			spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED);
368 			return true;
369 		}
370 	}
371 
372 	return false;
373 }
374 
375 static int
376 vbdev_delay_abort(struct vbdev_delay *delay_node, struct delay_io_channel *delay_ch,
377 		  struct spdk_bdev_io *bdev_io)
378 {
379 	struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort;
380 
381 	if (abort_delayed_io(&delay_ch->avg_read_io, bio_to_abort) ||
382 	    abort_delayed_io(&delay_ch->avg_write_io, bio_to_abort) ||
383 	    abort_delayed_io(&delay_ch->p99_read_io, bio_to_abort) ||
384 	    abort_delayed_io(&delay_ch->p99_write_io, bio_to_abort)) {
385 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
386 		return 0;
387 	}
388 
389 	return spdk_bdev_abort(delay_node->base_desc, delay_ch->base_ch, bio_to_abort,
390 			       _delay_complete_io, bdev_io);
391 }
392 
393 static void
394 vbdev_delay_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
395 {
396 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_delay, delay_bdev);
397 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
398 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
399 	struct spdk_bdev_ext_io_opts io_opts;
400 	int rc = 0;
401 	bool is_p99;
402 
403 	is_p99 = rand_r(&delay_ch->rand_seed) % 100 == 0 ? true : false;
404 
405 	io_ctx->ch = ch;
406 	io_ctx->type = DELAY_NONE;
407 	if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY || bdev_io->u.bdev.zcopy.start) {
408 		io_ctx->zcopy_bdev_io = NULL;
409 	}
410 
411 	switch (bdev_io->type) {
412 	case SPDK_BDEV_IO_TYPE_READ:
413 		io_ctx->type = is_p99 ? DELAY_P99_READ : DELAY_AVG_READ;
414 		spdk_bdev_io_get_buf(bdev_io, delay_read_get_buf_cb,
415 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
416 		break;
417 	case SPDK_BDEV_IO_TYPE_WRITE:
418 		io_ctx->type = is_p99 ? DELAY_P99_WRITE : DELAY_AVG_WRITE;
419 		delay_init_ext_io_opts(bdev_io, &io_opts);
420 		rc = spdk_bdev_writev_blocks_ext(delay_node->base_desc, delay_ch->base_ch, bdev_io->u.bdev.iovs,
421 						 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
422 						 bdev_io->u.bdev.num_blocks, _delay_complete_io,
423 						 bdev_io, &io_opts);
424 		break;
425 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
426 		rc = spdk_bdev_write_zeroes_blocks(delay_node->base_desc, delay_ch->base_ch,
427 						   bdev_io->u.bdev.offset_blocks,
428 						   bdev_io->u.bdev.num_blocks,
429 						   _delay_complete_io, bdev_io);
430 		break;
431 	case SPDK_BDEV_IO_TYPE_UNMAP:
432 		rc = spdk_bdev_unmap_blocks(delay_node->base_desc, delay_ch->base_ch,
433 					    bdev_io->u.bdev.offset_blocks,
434 					    bdev_io->u.bdev.num_blocks,
435 					    _delay_complete_io, bdev_io);
436 		break;
437 	case SPDK_BDEV_IO_TYPE_FLUSH:
438 		rc = spdk_bdev_flush_blocks(delay_node->base_desc, delay_ch->base_ch,
439 					    bdev_io->u.bdev.offset_blocks,
440 					    bdev_io->u.bdev.num_blocks,
441 					    _delay_complete_io, bdev_io);
442 		break;
443 	case SPDK_BDEV_IO_TYPE_RESET:
444 		/* During reset, the generic bdev layer aborts all new I/Os and queues all new resets.
445 		 * Hence we can simply abort all I/Os delayed to complete.
446 		 */
447 		spdk_for_each_channel(delay_node, vbdev_delay_reset_channel, bdev_io,
448 				      vbdev_delay_reset_dev);
449 		break;
450 	case SPDK_BDEV_IO_TYPE_ABORT:
451 		rc = vbdev_delay_abort(delay_node, delay_ch, bdev_io);
452 		break;
453 	case SPDK_BDEV_IO_TYPE_ZCOPY:
454 		if (bdev_io->u.bdev.zcopy.commit) {
455 			io_ctx->type = is_p99 ? DELAY_P99_WRITE : DELAY_AVG_WRITE;
456 		} else if (bdev_io->u.bdev.zcopy.populate) {
457 			io_ctx->type = is_p99 ? DELAY_P99_READ : DELAY_AVG_READ;
458 		}
459 		if (bdev_io->u.bdev.zcopy.start) {
460 			rc = spdk_bdev_zcopy_start(delay_node->base_desc, delay_ch->base_ch,
461 						   bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
462 						   bdev_io->u.bdev.offset_blocks,
463 						   bdev_io->u.bdev.num_blocks,
464 						   bdev_io->u.bdev.zcopy.populate,
465 						   _delay_complete_io, bdev_io);
466 		} else {
467 			rc = spdk_bdev_zcopy_end(io_ctx->zcopy_bdev_io, bdev_io->u.bdev.zcopy.commit,
468 						 _delay_complete_io, bdev_io);
469 		}
470 		break;
471 	default:
472 		SPDK_ERRLOG("delay: unknown I/O type %d\n", bdev_io->type);
473 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
474 		return;
475 	}
476 
477 	if (rc == -ENOMEM) {
478 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
479 		vbdev_delay_queue_io(bdev_io);
480 	} else if (rc != 0) {
481 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
482 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
483 	}
484 }
485 
486 static bool
487 vbdev_delay_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
488 {
489 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
490 
491 	return spdk_bdev_io_type_supported(delay_node->base_bdev, io_type);
492 }
493 
494 static struct spdk_io_channel *
495 vbdev_delay_get_io_channel(void *ctx)
496 {
497 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
498 	struct spdk_io_channel *delay_ch = NULL;
499 
500 	delay_ch = spdk_get_io_channel(delay_node);
501 
502 	return delay_ch;
503 }
504 
505 static void
506 _delay_write_conf_values(struct vbdev_delay *delay_node, struct spdk_json_write_ctx *w)
507 {
508 	struct spdk_uuid *uuid = &delay_node->delay_bdev.uuid;
509 	char uuid_str[SPDK_UUID_STRING_LEN];
510 
511 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&delay_node->delay_bdev));
512 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(delay_node->base_bdev));
513 	if (!spdk_uuid_is_null(uuid)) {
514 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), uuid);
515 		spdk_json_write_named_string(w, "uuid", uuid_str);
516 	}
517 	spdk_json_write_named_int64(w, "avg_read_latency",
518 				    delay_node->average_read_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
519 	spdk_json_write_named_int64(w, "p99_read_latency",
520 				    delay_node->p99_read_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
521 	spdk_json_write_named_int64(w, "avg_write_latency",
522 				    delay_node->average_write_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
523 	spdk_json_write_named_int64(w, "p99_write_latency",
524 				    delay_node->p99_write_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
525 }
526 
527 static int
528 vbdev_delay_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
529 {
530 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
531 
532 	spdk_json_write_name(w, "delay");
533 	spdk_json_write_object_begin(w);
534 	_delay_write_conf_values(delay_node, w);
535 	spdk_json_write_object_end(w);
536 
537 	return 0;
538 }
539 
540 /* This is used to generate JSON that can configure this module to its current state. */
541 static int
542 vbdev_delay_config_json(struct spdk_json_write_ctx *w)
543 {
544 	struct vbdev_delay *delay_node;
545 
546 	TAILQ_FOREACH(delay_node, &g_delay_nodes, link) {
547 		spdk_json_write_object_begin(w);
548 		spdk_json_write_named_string(w, "method", "bdev_delay_create");
549 		spdk_json_write_named_object_begin(w, "params");
550 		_delay_write_conf_values(delay_node, w);
551 		spdk_json_write_object_end(w);
552 		spdk_json_write_object_end(w);
553 	}
554 	return 0;
555 }
556 
557 /* We provide this callback for the SPDK channel code to create a channel using
558  * the channel struct we provided in our module get_io_channel() entry point. Here
559  * we get and save off an underlying base channel of the device below us so that
560  * we can communicate with the base bdev on a per channel basis.  If we needed
561  * our own poller for this vbdev, we'd register it here.
562  */
563 static int
564 delay_bdev_ch_create_cb(void *io_device, void *ctx_buf)
565 {
566 	struct delay_io_channel *delay_ch = ctx_buf;
567 	struct vbdev_delay *delay_node = io_device;
568 
569 	STAILQ_INIT(&delay_ch->avg_read_io);
570 	STAILQ_INIT(&delay_ch->p99_read_io);
571 	STAILQ_INIT(&delay_ch->avg_write_io);
572 	STAILQ_INIT(&delay_ch->p99_write_io);
573 
574 	delay_ch->io_poller = SPDK_POLLER_REGISTER(_delay_finish_io, delay_ch, 0);
575 	delay_ch->base_ch = spdk_bdev_get_io_channel(delay_node->base_desc);
576 	delay_ch->rand_seed = time(NULL);
577 
578 	return 0;
579 }
580 
581 /* We provide this callback for the SPDK channel code to destroy a channel
582  * created with our create callback. We just need to undo anything we did
583  * when we created. If this bdev used its own poller, we'd unregister it here.
584  */
585 static void
586 delay_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
587 {
588 	struct delay_io_channel *delay_ch = ctx_buf;
589 
590 	spdk_poller_unregister(&delay_ch->io_poller);
591 	spdk_put_io_channel(delay_ch->base_ch);
592 }
593 
594 /* Create the delay association from the bdev and vbdev name and insert
595  * on the global list. */
596 static int
597 vbdev_delay_insert_association(const char *bdev_name, const char *vbdev_name,
598 			       struct spdk_uuid *uuid,
599 			       uint64_t avg_read_latency, uint64_t p99_read_latency,
600 			       uint64_t avg_write_latency, uint64_t p99_write_latency)
601 {
602 	struct bdev_association *assoc;
603 
604 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
605 		if (strcmp(vbdev_name, assoc->vbdev_name) == 0) {
606 			SPDK_ERRLOG("delay bdev %s already exists\n", vbdev_name);
607 			return -EEXIST;
608 		}
609 	}
610 
611 	assoc = calloc(1, sizeof(struct bdev_association));
612 	if (!assoc) {
613 		SPDK_ERRLOG("could not allocate bdev_association\n");
614 		return -ENOMEM;
615 	}
616 
617 	assoc->bdev_name = strdup(bdev_name);
618 	if (!assoc->bdev_name) {
619 		SPDK_ERRLOG("could not allocate assoc->bdev_name\n");
620 		free(assoc);
621 		return -ENOMEM;
622 	}
623 
624 	assoc->vbdev_name = strdup(vbdev_name);
625 	if (!assoc->vbdev_name) {
626 		SPDK_ERRLOG("could not allocate assoc->vbdev_name\n");
627 		free(assoc->bdev_name);
628 		free(assoc);
629 		return -ENOMEM;
630 	}
631 
632 	assoc->avg_read_latency = avg_read_latency;
633 	assoc->p99_read_latency = p99_read_latency;
634 	assoc->avg_write_latency = avg_write_latency;
635 	assoc->p99_write_latency = p99_write_latency;
636 
637 	if (uuid) {
638 		spdk_uuid_copy(&assoc->uuid, uuid);
639 	}
640 
641 	TAILQ_INSERT_TAIL(&g_bdev_associations, assoc, link);
642 
643 	return 0;
644 }
645 
646 int
647 vbdev_delay_update_latency_value(char *delay_name, uint64_t latency_us, enum delay_io_type type)
648 {
649 	struct vbdev_delay *delay_node;
650 	uint64_t ticks_mhz = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
651 
652 	TAILQ_FOREACH(delay_node, &g_delay_nodes, link) {
653 		if (strcmp(delay_node->delay_bdev.name, delay_name) == 0) {
654 			break;
655 		}
656 	}
657 
658 	if (delay_node == NULL) {
659 		return -ENODEV;
660 	}
661 
662 	switch (type) {
663 	case DELAY_AVG_READ:
664 		delay_node->average_read_latency_ticks = ticks_mhz * latency_us;
665 		break;
666 	case DELAY_AVG_WRITE:
667 		delay_node->average_write_latency_ticks = ticks_mhz * latency_us;
668 		break;
669 	case DELAY_P99_READ:
670 		delay_node->p99_read_latency_ticks = ticks_mhz * latency_us;
671 		break;
672 	case DELAY_P99_WRITE:
673 		delay_node->p99_write_latency_ticks = ticks_mhz * latency_us;
674 		break;
675 	default:
676 		return -EINVAL;
677 	}
678 
679 	return 0;
680 }
681 
682 static int
683 vbdev_delay_init(void)
684 {
685 	/* Not allowing for .ini style configuration. */
686 	return 0;
687 }
688 
689 static void
690 vbdev_delay_finish(void)
691 {
692 	struct bdev_association *assoc;
693 
694 	while ((assoc = TAILQ_FIRST(&g_bdev_associations))) {
695 		TAILQ_REMOVE(&g_bdev_associations, assoc, link);
696 		free(assoc->bdev_name);
697 		free(assoc->vbdev_name);
698 		free(assoc);
699 	}
700 }
701 
702 static int
703 vbdev_delay_get_ctx_size(void)
704 {
705 	return sizeof(struct delay_bdev_io);
706 }
707 
708 static void
709 vbdev_delay_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
710 {
711 	/* No config per bdev needed */
712 }
713 
714 static int
715 vbdev_delay_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
716 {
717 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
718 
719 	/* Delay bdev doesn't work with data buffers, so it supports any memory domain used by base_bdev */
720 	return spdk_bdev_get_memory_domains(delay_node->base_bdev, domains, array_size);
721 }
722 
723 /* When we register our bdev this is how we specify our entry points. */
724 static const struct spdk_bdev_fn_table vbdev_delay_fn_table = {
725 	.destruct		= vbdev_delay_destruct,
726 	.submit_request		= vbdev_delay_submit_request,
727 	.io_type_supported	= vbdev_delay_io_type_supported,
728 	.get_io_channel		= vbdev_delay_get_io_channel,
729 	.dump_info_json		= vbdev_delay_dump_info_json,
730 	.write_config_json	= vbdev_delay_write_config_json,
731 	.get_memory_domains	= vbdev_delay_get_memory_domains,
732 };
733 
734 static void
735 vbdev_delay_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
736 {
737 	struct vbdev_delay *delay_node, *tmp;
738 
739 	TAILQ_FOREACH_SAFE(delay_node, &g_delay_nodes, link, tmp) {
740 		if (bdev_find == delay_node->base_bdev) {
741 			spdk_bdev_unregister(&delay_node->delay_bdev, NULL, NULL);
742 		}
743 	}
744 }
745 
746 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
747 static void
748 vbdev_delay_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
749 			       void *event_ctx)
750 {
751 	switch (type) {
752 	case SPDK_BDEV_EVENT_REMOVE:
753 		vbdev_delay_base_bdev_hotremove_cb(bdev);
754 		break;
755 	default:
756 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
757 		break;
758 	}
759 }
760 
761 /* Create and register the delay vbdev if we find it in our list of bdev names.
762  * This can be called either by the examine path or RPC method.
763  */
764 static int
765 vbdev_delay_register(const char *bdev_name)
766 {
767 	struct bdev_association *assoc;
768 	struct vbdev_delay *delay_node;
769 	struct spdk_bdev *bdev;
770 	uint64_t ticks_mhz = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
771 	struct spdk_uuid ns_uuid;
772 	int rc = 0;
773 
774 	spdk_uuid_parse(&ns_uuid, BDEV_DELAY_NAMESPACE_UUID);
775 
776 	/* Check our list of names from config versus this bdev and if
777 	 * there's a match, create the delay_node & bdev accordingly.
778 	 */
779 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
780 		if (strcmp(assoc->bdev_name, bdev_name) != 0) {
781 			continue;
782 		}
783 
784 		delay_node = calloc(1, sizeof(struct vbdev_delay));
785 		if (!delay_node) {
786 			rc = -ENOMEM;
787 			SPDK_ERRLOG("could not allocate delay_node\n");
788 			break;
789 		}
790 		delay_node->delay_bdev.name = strdup(assoc->vbdev_name);
791 		if (!delay_node->delay_bdev.name) {
792 			rc = -ENOMEM;
793 			SPDK_ERRLOG("could not allocate delay_bdev name\n");
794 			free(delay_node);
795 			break;
796 		}
797 		delay_node->delay_bdev.product_name = "delay";
798 
799 		/* The base bdev that we're attaching to. */
800 		rc = spdk_bdev_open_ext(bdev_name, true, vbdev_delay_base_bdev_event_cb,
801 					NULL, &delay_node->base_desc);
802 		if (rc) {
803 			if (rc != -ENODEV) {
804 				SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
805 			}
806 			free(delay_node->delay_bdev.name);
807 			free(delay_node);
808 			break;
809 		}
810 
811 		bdev = spdk_bdev_desc_get_bdev(delay_node->base_desc);
812 		delay_node->base_bdev = bdev;
813 
814 		delay_node->delay_bdev.write_cache = bdev->write_cache;
815 		delay_node->delay_bdev.required_alignment = bdev->required_alignment;
816 		delay_node->delay_bdev.optimal_io_boundary = bdev->optimal_io_boundary;
817 		delay_node->delay_bdev.blocklen = bdev->blocklen;
818 		delay_node->delay_bdev.blockcnt = bdev->blockcnt;
819 
820 		delay_node->delay_bdev.ctxt = delay_node;
821 		delay_node->delay_bdev.fn_table = &vbdev_delay_fn_table;
822 		delay_node->delay_bdev.module = &delay_if;
823 
824 		/* Store the number of ticks you need to add to get the I/O expiration time. */
825 		delay_node->average_read_latency_ticks = ticks_mhz * assoc->avg_read_latency;
826 		delay_node->p99_read_latency_ticks = ticks_mhz * assoc->p99_read_latency;
827 		delay_node->average_write_latency_ticks = ticks_mhz * assoc->avg_write_latency;
828 		delay_node->p99_write_latency_ticks = ticks_mhz * assoc->p99_write_latency;
829 
830 		if (spdk_uuid_is_null(&assoc->uuid)) {
831 			/* Generate UUID based on namespace UUID + base bdev UUID */
832 			rc = spdk_uuid_generate_sha1(&delay_node->delay_bdev.uuid, &ns_uuid,
833 						     (const char *)&bdev->uuid, sizeof(struct spdk_uuid));
834 			if (rc) {
835 				spdk_bdev_close(delay_node->base_desc);
836 				free(delay_node->delay_bdev.name);
837 				free(delay_node);
838 				break;
839 			}
840 		} else {
841 			spdk_uuid_copy(&delay_node->delay_bdev.uuid, &assoc->uuid);
842 		}
843 
844 		spdk_io_device_register(delay_node, delay_bdev_ch_create_cb, delay_bdev_ch_destroy_cb,
845 					sizeof(struct delay_io_channel),
846 					assoc->vbdev_name);
847 
848 		/* Save the thread where the base device is opened */
849 		delay_node->thread = spdk_get_thread();
850 
851 		rc = spdk_bdev_module_claim_bdev(bdev, delay_node->base_desc, delay_node->delay_bdev.module);
852 		if (rc) {
853 			SPDK_ERRLOG("could not claim bdev %s\n", bdev_name);
854 			goto error_close;
855 		}
856 
857 		rc = spdk_bdev_register(&delay_node->delay_bdev);
858 		if (rc) {
859 			SPDK_ERRLOG("could not register delay_bdev\n");
860 			spdk_bdev_module_release_bdev(delay_node->base_bdev);
861 			goto error_close;
862 		}
863 
864 		TAILQ_INSERT_TAIL(&g_delay_nodes, delay_node, link);
865 	}
866 
867 	return rc;
868 
869 error_close:
870 	spdk_bdev_close(delay_node->base_desc);
871 	spdk_io_device_unregister(delay_node, NULL);
872 	free(delay_node->delay_bdev.name);
873 	free(delay_node);
874 	return rc;
875 }
876 
877 int
878 create_delay_disk(const char *bdev_name, const char *vbdev_name, struct spdk_uuid *uuid,
879 		  uint64_t avg_read_latency,
880 		  uint64_t p99_read_latency, uint64_t avg_write_latency, uint64_t p99_write_latency)
881 {
882 	int rc = 0;
883 
884 	if (p99_read_latency < avg_read_latency || p99_write_latency < avg_write_latency) {
885 		SPDK_ERRLOG("Unable to create a delay bdev where p99 latency is less than average latency.\n");
886 		return -EINVAL;
887 	}
888 
889 	rc = vbdev_delay_insert_association(bdev_name, vbdev_name, uuid, avg_read_latency, p99_read_latency,
890 					    avg_write_latency, p99_write_latency);
891 	if (rc) {
892 		return rc;
893 	}
894 
895 	rc = vbdev_delay_register(bdev_name);
896 	if (rc == -ENODEV) {
897 		/* This is not an error, we tracked the name above and it still
898 		 * may show up later.
899 		 */
900 		SPDK_NOTICELOG("vbdev creation deferred pending base bdev arrival\n");
901 		rc = 0;
902 	}
903 
904 	return rc;
905 }
906 
907 void
908 delete_delay_disk(const char *vbdev_name, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
909 {
910 	struct bdev_association *assoc;
911 	int rc;
912 
913 	rc = spdk_bdev_unregister_by_name(vbdev_name, &delay_if, cb_fn, cb_arg);
914 	if (rc == 0) {
915 		TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
916 			if (strcmp(assoc->vbdev_name, vbdev_name) == 0) {
917 				TAILQ_REMOVE(&g_bdev_associations, assoc, link);
918 				free(assoc->bdev_name);
919 				free(assoc->vbdev_name);
920 				free(assoc);
921 				break;
922 			}
923 		}
924 	} else {
925 		cb_fn(cb_arg, rc);
926 	}
927 }
928 
929 static void
930 vbdev_delay_examine(struct spdk_bdev *bdev)
931 {
932 	vbdev_delay_register(bdev->name);
933 
934 	spdk_bdev_module_examine_done(&delay_if);
935 }
936 
937 SPDK_LOG_REGISTER_COMPONENT(vbdev_delay)
938