xref: /spdk/module/bdev/delay/vbdev_delay.c (revision 838e61c3772fdefb17e1a0b8f9880e2bcb9c4c0d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "vbdev_delay.h"
10 #include "spdk/rpc.h"
11 #include "spdk/env.h"
12 #include "spdk/endian.h"
13 #include "spdk/string.h"
14 #include "spdk/thread.h"
15 #include "spdk/util.h"
16 
17 #include "spdk/bdev_module.h"
18 #include "spdk/log.h"
19 
20 
21 static int vbdev_delay_init(void);
22 static int vbdev_delay_get_ctx_size(void);
23 static void vbdev_delay_examine(struct spdk_bdev *bdev);
24 static void vbdev_delay_finish(void);
25 static int vbdev_delay_config_json(struct spdk_json_write_ctx *w);
26 
27 static struct spdk_bdev_module delay_if = {
28 	.name = "delay",
29 	.module_init = vbdev_delay_init,
30 	.get_ctx_size = vbdev_delay_get_ctx_size,
31 	.examine_config = vbdev_delay_examine,
32 	.module_fini = vbdev_delay_finish,
33 	.config_json = vbdev_delay_config_json
34 };
35 
36 SPDK_BDEV_MODULE_REGISTER(delay, &delay_if)
37 
38 /* Associative list to be used in examine */
39 struct bdev_association {
40 	char			*vbdev_name;
41 	char			*bdev_name;
42 	struct spdk_uuid	uuid;
43 	uint64_t		avg_read_latency;
44 	uint64_t		p99_read_latency;
45 	uint64_t		avg_write_latency;
46 	uint64_t		p99_write_latency;
47 	TAILQ_ENTRY(bdev_association)	link;
48 };
49 static TAILQ_HEAD(, bdev_association) g_bdev_associations = TAILQ_HEAD_INITIALIZER(
50 			g_bdev_associations);
51 
52 /* List of virtual bdevs and associated info for each. */
53 struct vbdev_delay {
54 	struct spdk_bdev		*base_bdev; /* the thing we're attaching to */
55 	struct spdk_bdev_desc		*base_desc; /* its descriptor we get from open */
56 	struct spdk_bdev		delay_bdev;    /* the delay virtual bdev */
57 	uint64_t			average_read_latency_ticks; /* the average read delay */
58 	uint64_t			p99_read_latency_ticks; /* the p99 read delay */
59 	uint64_t			average_write_latency_ticks; /* the average write delay */
60 	uint64_t			p99_write_latency_ticks; /* the p99 write delay */
61 	TAILQ_ENTRY(vbdev_delay)	link;
62 	struct spdk_thread		*thread;    /* thread where base device is opened */
63 };
64 static TAILQ_HEAD(, vbdev_delay) g_delay_nodes = TAILQ_HEAD_INITIALIZER(g_delay_nodes);
65 
66 struct delay_bdev_io {
67 	int status;
68 
69 	uint64_t completion_tick;
70 
71 	enum delay_io_type type;
72 
73 	struct spdk_io_channel *ch;
74 
75 	struct spdk_bdev_io_wait_entry bdev_io_wait;
76 
77 	struct spdk_bdev_io *zcopy_bdev_io;
78 
79 	STAILQ_ENTRY(delay_bdev_io) link;
80 };
81 
82 struct delay_io_channel {
83 	struct spdk_io_channel	*base_ch; /* IO channel of base device */
84 	STAILQ_HEAD(, delay_bdev_io) avg_read_io;
85 	STAILQ_HEAD(, delay_bdev_io) p99_read_io;
86 	STAILQ_HEAD(, delay_bdev_io) avg_write_io;
87 	STAILQ_HEAD(, delay_bdev_io) p99_write_io;
88 	struct spdk_poller *io_poller;
89 	unsigned int rand_seed;
90 };
91 
92 static void vbdev_delay_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
93 
94 
95 /* Callback for unregistering the IO device. */
96 static void
97 _device_unregister_cb(void *io_device)
98 {
99 	struct vbdev_delay *delay_node  = io_device;
100 
101 	/* Done with this delay_node. */
102 	free(delay_node->delay_bdev.name);
103 	free(delay_node);
104 }
105 
106 static void
107 _vbdev_delay_destruct(void *ctx)
108 {
109 	struct spdk_bdev_desc *desc = ctx;
110 
111 	spdk_bdev_close(desc);
112 }
113 
114 static int
115 vbdev_delay_destruct(void *ctx)
116 {
117 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
118 
119 	/* It is important to follow this exact sequence of steps for destroying
120 	 * a vbdev...
121 	 */
122 
123 	TAILQ_REMOVE(&g_delay_nodes, delay_node, link);
124 
125 	/* Unclaim the underlying bdev. */
126 	spdk_bdev_module_release_bdev(delay_node->base_bdev);
127 
128 	/* Close the underlying bdev on its same opened thread. */
129 	if (delay_node->thread && delay_node->thread != spdk_get_thread()) {
130 		spdk_thread_send_msg(delay_node->thread, _vbdev_delay_destruct, delay_node->base_desc);
131 	} else {
132 		spdk_bdev_close(delay_node->base_desc);
133 	}
134 
135 	/* Unregister the io_device. */
136 	spdk_io_device_unregister(delay_node, _device_unregister_cb);
137 
138 	return 0;
139 }
140 
141 static int
142 _process_io_stailq(void *arg, uint64_t ticks)
143 {
144 	STAILQ_HEAD(, delay_bdev_io) *head = arg;
145 	struct delay_bdev_io *io_ctx, *tmp;
146 	int completions = 0;
147 
148 	STAILQ_FOREACH_SAFE(io_ctx, head, link, tmp) {
149 		if (io_ctx->completion_tick <= ticks) {
150 			STAILQ_REMOVE(head, io_ctx, delay_bdev_io, link);
151 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(io_ctx), io_ctx->status);
152 			completions++;
153 		} else {
154 			/* In the general case, I/O will become ready in an fifo order. When timeouts are dynamically
155 			 * changed, this is not necessarily the case. However, the normal behavior will be restored
156 			 * after the outstanding I/O at the time of the change have been completed.
157 			 * This essentially means that moving from a high to low latency creates a dam for the new I/O
158 			 * submitted after the latency change. This is considered desirable behavior for the use case where
159 			 * we are trying to trigger a pre-defined timeout on an initiator.
160 			 */
161 			break;
162 		}
163 	}
164 
165 	return completions;
166 }
167 
168 static int
169 _delay_finish_io(void *arg)
170 {
171 	struct delay_io_channel *delay_ch = arg;
172 	uint64_t ticks = spdk_get_ticks();
173 	int completions = 0;
174 
175 	completions += _process_io_stailq(&delay_ch->avg_read_io, ticks);
176 	completions += _process_io_stailq(&delay_ch->avg_write_io, ticks);
177 	completions += _process_io_stailq(&delay_ch->p99_read_io, ticks);
178 	completions += _process_io_stailq(&delay_ch->p99_write_io, ticks);
179 
180 	return completions == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
181 }
182 
183 /* Completion callback for IO that were issued from this bdev. The original bdev_io
184  * is passed in as an arg so we'll complete that one with the appropriate status
185  * and then free the one that this module issued.
186  */
187 static void
188 _delay_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
189 {
190 	struct spdk_bdev_io *orig_io = cb_arg;
191 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(orig_io->bdev, struct vbdev_delay, delay_bdev);
192 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)orig_io->driver_ctx;
193 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
194 
195 	io_ctx->status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
196 
197 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_ZCOPY && bdev_io->u.bdev.zcopy.start && success) {
198 		io_ctx->zcopy_bdev_io = bdev_io;
199 	} else {
200 		assert(io_ctx->zcopy_bdev_io == NULL || io_ctx->zcopy_bdev_io == bdev_io);
201 		io_ctx->zcopy_bdev_io = NULL;
202 		spdk_bdev_free_io(bdev_io);
203 	}
204 
205 	/* Put the I/O into the proper list for processing by the channel poller. */
206 	switch (io_ctx->type) {
207 	case DELAY_AVG_READ:
208 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->average_read_latency_ticks;
209 		STAILQ_INSERT_TAIL(&delay_ch->avg_read_io, io_ctx, link);
210 		break;
211 	case DELAY_AVG_WRITE:
212 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->average_write_latency_ticks;
213 		STAILQ_INSERT_TAIL(&delay_ch->avg_write_io, io_ctx, link);
214 		break;
215 	case DELAY_P99_READ:
216 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->p99_read_latency_ticks;
217 		STAILQ_INSERT_TAIL(&delay_ch->p99_read_io, io_ctx, link);
218 		break;
219 	case DELAY_P99_WRITE:
220 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->p99_write_latency_ticks;
221 		STAILQ_INSERT_TAIL(&delay_ch->p99_write_io, io_ctx, link);
222 		break;
223 	case DELAY_NONE:
224 	default:
225 		spdk_bdev_io_complete(orig_io, io_ctx->status);
226 		break;
227 	}
228 }
229 
230 static void
231 vbdev_delay_resubmit_io(void *arg)
232 {
233 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
234 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
235 
236 	vbdev_delay_submit_request(io_ctx->ch, bdev_io);
237 }
238 
239 static void
240 vbdev_delay_queue_io(struct spdk_bdev_io *bdev_io)
241 {
242 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
243 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
244 	int rc;
245 
246 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
247 	io_ctx->bdev_io_wait.cb_fn = vbdev_delay_resubmit_io;
248 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
249 
250 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, delay_ch->base_ch, &io_ctx->bdev_io_wait);
251 	if (rc != 0) {
252 		SPDK_ERRLOG("Queue io failed in vbdev_delay_queue_io, rc=%d.\n", rc);
253 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
254 	}
255 }
256 
257 static void
258 delay_init_ext_io_opts(struct spdk_bdev_io *bdev_io, struct spdk_bdev_ext_io_opts *opts)
259 {
260 	memset(opts, 0, sizeof(*opts));
261 	opts->size = sizeof(*opts);
262 	opts->memory_domain = bdev_io->u.bdev.memory_domain;
263 	opts->memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
264 	opts->metadata = bdev_io->u.bdev.md_buf;
265 }
266 
267 static void
268 delay_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
269 {
270 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_delay,
271 					 delay_bdev);
272 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
273 	struct spdk_bdev_ext_io_opts io_opts;
274 	int rc;
275 
276 	if (!success) {
277 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
278 		return;
279 	}
280 
281 	delay_init_ext_io_opts(bdev_io, &io_opts);
282 	rc = spdk_bdev_readv_blocks_ext(delay_node->base_desc, delay_ch->base_ch, bdev_io->u.bdev.iovs,
283 					bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
284 					bdev_io->u.bdev.num_blocks, _delay_complete_io,
285 					bdev_io, &io_opts);
286 
287 	if (rc == -ENOMEM) {
288 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
289 		vbdev_delay_queue_io(bdev_io);
290 	} else if (rc != 0) {
291 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
292 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
293 	}
294 }
295 
296 static void
297 vbdev_delay_reset_dev(struct spdk_io_channel_iter *i, int status)
298 {
299 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
300 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
301 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
302 	struct vbdev_delay *delay_node = spdk_io_channel_iter_get_io_device(i);
303 	int rc;
304 
305 	rc = spdk_bdev_reset(delay_node->base_desc, delay_ch->base_ch,
306 			     _delay_complete_io, bdev_io);
307 
308 	if (rc == -ENOMEM) {
309 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
310 		vbdev_delay_queue_io(bdev_io);
311 	} else if (rc != 0) {
312 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
313 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
314 	}
315 }
316 
317 static void
318 abort_zcopy_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
319 {
320 	spdk_bdev_free_io(bdev_io);
321 }
322 
323 static void
324 _abort_all_delayed_io(void *arg)
325 {
326 	STAILQ_HEAD(, delay_bdev_io) *head = arg;
327 	struct delay_bdev_io *io_ctx, *tmp;
328 
329 	STAILQ_FOREACH_SAFE(io_ctx, head, link, tmp) {
330 		STAILQ_REMOVE(head, io_ctx, delay_bdev_io, link);
331 		if (io_ctx->zcopy_bdev_io != NULL) {
332 			spdk_bdev_zcopy_end(io_ctx->zcopy_bdev_io, false, abort_zcopy_io, NULL);
333 		}
334 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(io_ctx), SPDK_BDEV_IO_STATUS_ABORTED);
335 	}
336 }
337 
338 static void
339 vbdev_delay_reset_channel(struct spdk_io_channel_iter *i)
340 {
341 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
342 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
343 
344 	_abort_all_delayed_io(&delay_ch->avg_read_io);
345 	_abort_all_delayed_io(&delay_ch->avg_write_io);
346 	_abort_all_delayed_io(&delay_ch->p99_read_io);
347 	_abort_all_delayed_io(&delay_ch->p99_write_io);
348 
349 	spdk_for_each_channel_continue(i, 0);
350 }
351 
352 static bool
353 abort_delayed_io(void *_head, struct spdk_bdev_io *bio_to_abort)
354 {
355 	STAILQ_HEAD(, delay_bdev_io) *head = _head;
356 	struct delay_bdev_io *io_ctx_to_abort = (struct delay_bdev_io *)bio_to_abort->driver_ctx;
357 	struct delay_bdev_io *io_ctx;
358 
359 	STAILQ_FOREACH(io_ctx, head, link) {
360 		if (io_ctx == io_ctx_to_abort) {
361 			STAILQ_REMOVE(head, io_ctx_to_abort, delay_bdev_io, link);
362 			if (io_ctx->zcopy_bdev_io != NULL) {
363 				spdk_bdev_zcopy_end(io_ctx->zcopy_bdev_io, false, abort_zcopy_io, NULL);
364 			}
365 			spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED);
366 			return true;
367 		}
368 	}
369 
370 	return false;
371 }
372 
373 static int
374 vbdev_delay_abort(struct vbdev_delay *delay_node, struct delay_io_channel *delay_ch,
375 		  struct spdk_bdev_io *bdev_io)
376 {
377 	struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort;
378 
379 	if (abort_delayed_io(&delay_ch->avg_read_io, bio_to_abort) ||
380 	    abort_delayed_io(&delay_ch->avg_write_io, bio_to_abort) ||
381 	    abort_delayed_io(&delay_ch->p99_read_io, bio_to_abort) ||
382 	    abort_delayed_io(&delay_ch->p99_write_io, bio_to_abort)) {
383 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
384 		return 0;
385 	}
386 
387 	return spdk_bdev_abort(delay_node->base_desc, delay_ch->base_ch, bio_to_abort,
388 			       _delay_complete_io, bdev_io);
389 }
390 
391 static void
392 vbdev_delay_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
393 {
394 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_delay, delay_bdev);
395 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
396 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
397 	struct spdk_bdev_ext_io_opts io_opts;
398 	int rc = 0;
399 	bool is_p99;
400 
401 	is_p99 = rand_r(&delay_ch->rand_seed) % 100 == 0 ? true : false;
402 
403 	io_ctx->ch = ch;
404 	io_ctx->type = DELAY_NONE;
405 	if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY || bdev_io->u.bdev.zcopy.start) {
406 		io_ctx->zcopy_bdev_io = NULL;
407 	}
408 
409 	switch (bdev_io->type) {
410 	case SPDK_BDEV_IO_TYPE_READ:
411 		io_ctx->type = is_p99 ? DELAY_P99_READ : DELAY_AVG_READ;
412 		spdk_bdev_io_get_buf(bdev_io, delay_read_get_buf_cb,
413 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
414 		break;
415 	case SPDK_BDEV_IO_TYPE_WRITE:
416 		io_ctx->type = is_p99 ? DELAY_P99_WRITE : DELAY_AVG_WRITE;
417 		delay_init_ext_io_opts(bdev_io, &io_opts);
418 		rc = spdk_bdev_writev_blocks_ext(delay_node->base_desc, delay_ch->base_ch, bdev_io->u.bdev.iovs,
419 						 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
420 						 bdev_io->u.bdev.num_blocks, _delay_complete_io,
421 						 bdev_io, &io_opts);
422 		break;
423 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
424 		rc = spdk_bdev_write_zeroes_blocks(delay_node->base_desc, delay_ch->base_ch,
425 						   bdev_io->u.bdev.offset_blocks,
426 						   bdev_io->u.bdev.num_blocks,
427 						   _delay_complete_io, bdev_io);
428 		break;
429 	case SPDK_BDEV_IO_TYPE_UNMAP:
430 		rc = spdk_bdev_unmap_blocks(delay_node->base_desc, delay_ch->base_ch,
431 					    bdev_io->u.bdev.offset_blocks,
432 					    bdev_io->u.bdev.num_blocks,
433 					    _delay_complete_io, bdev_io);
434 		break;
435 	case SPDK_BDEV_IO_TYPE_FLUSH:
436 		rc = spdk_bdev_flush_blocks(delay_node->base_desc, delay_ch->base_ch,
437 					    bdev_io->u.bdev.offset_blocks,
438 					    bdev_io->u.bdev.num_blocks,
439 					    _delay_complete_io, bdev_io);
440 		break;
441 	case SPDK_BDEV_IO_TYPE_RESET:
442 		/* During reset, the generic bdev layer aborts all new I/Os and queues all new resets.
443 		 * Hence we can simply abort all I/Os delayed to complete.
444 		 */
445 		spdk_for_each_channel(delay_node, vbdev_delay_reset_channel, bdev_io,
446 				      vbdev_delay_reset_dev);
447 		break;
448 	case SPDK_BDEV_IO_TYPE_ABORT:
449 		rc = vbdev_delay_abort(delay_node, delay_ch, bdev_io);
450 		break;
451 	case SPDK_BDEV_IO_TYPE_ZCOPY:
452 		if (bdev_io->u.bdev.zcopy.commit) {
453 			io_ctx->type = is_p99 ? DELAY_P99_WRITE : DELAY_AVG_WRITE;
454 		} else if (bdev_io->u.bdev.zcopy.populate) {
455 			io_ctx->type = is_p99 ? DELAY_P99_READ : DELAY_AVG_READ;
456 		}
457 		if (bdev_io->u.bdev.zcopy.start) {
458 			rc = spdk_bdev_zcopy_start(delay_node->base_desc, delay_ch->base_ch,
459 						   bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
460 						   bdev_io->u.bdev.offset_blocks,
461 						   bdev_io->u.bdev.num_blocks,
462 						   bdev_io->u.bdev.zcopy.populate,
463 						   _delay_complete_io, bdev_io);
464 		} else {
465 			rc = spdk_bdev_zcopy_end(io_ctx->zcopy_bdev_io, bdev_io->u.bdev.zcopy.commit,
466 						 _delay_complete_io, bdev_io);
467 		}
468 		break;
469 	default:
470 		SPDK_ERRLOG("delay: unknown I/O type %d\n", bdev_io->type);
471 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
472 		return;
473 	}
474 
475 	if (rc == -ENOMEM) {
476 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
477 		vbdev_delay_queue_io(bdev_io);
478 	} else if (rc != 0) {
479 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
480 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
481 	}
482 }
483 
484 static bool
485 vbdev_delay_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
486 {
487 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
488 
489 	return spdk_bdev_io_type_supported(delay_node->base_bdev, io_type);
490 }
491 
492 static struct spdk_io_channel *
493 vbdev_delay_get_io_channel(void *ctx)
494 {
495 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
496 	struct spdk_io_channel *delay_ch = NULL;
497 
498 	delay_ch = spdk_get_io_channel(delay_node);
499 
500 	return delay_ch;
501 }
502 
503 static void
504 _delay_write_conf_values(struct vbdev_delay *delay_node, struct spdk_json_write_ctx *w)
505 {
506 	struct spdk_uuid *uuid = &delay_node->delay_bdev.uuid;
507 	char uuid_str[SPDK_UUID_STRING_LEN];
508 
509 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&delay_node->delay_bdev));
510 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(delay_node->base_bdev));
511 	if (!spdk_mem_all_zero(uuid, sizeof(uuid))) {
512 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), uuid);
513 		spdk_json_write_named_string(w, "uuid", uuid_str);
514 	}
515 	spdk_json_write_named_int64(w, "avg_read_latency",
516 				    delay_node->average_read_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
517 	spdk_json_write_named_int64(w, "p99_read_latency",
518 				    delay_node->p99_read_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
519 	spdk_json_write_named_int64(w, "avg_write_latency",
520 				    delay_node->average_write_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
521 	spdk_json_write_named_int64(w, "p99_write_latency",
522 				    delay_node->p99_write_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
523 }
524 
525 static int
526 vbdev_delay_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
527 {
528 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
529 
530 	spdk_json_write_name(w, "delay");
531 	spdk_json_write_object_begin(w);
532 	_delay_write_conf_values(delay_node, w);
533 	spdk_json_write_object_end(w);
534 
535 	return 0;
536 }
537 
538 /* This is used to generate JSON that can configure this module to its current state. */
539 static int
540 vbdev_delay_config_json(struct spdk_json_write_ctx *w)
541 {
542 	struct vbdev_delay *delay_node;
543 
544 	TAILQ_FOREACH(delay_node, &g_delay_nodes, link) {
545 		spdk_json_write_object_begin(w);
546 		spdk_json_write_named_string(w, "method", "bdev_delay_create");
547 		spdk_json_write_named_object_begin(w, "params");
548 		_delay_write_conf_values(delay_node, w);
549 		spdk_json_write_object_end(w);
550 		spdk_json_write_object_end(w);
551 	}
552 	return 0;
553 }
554 
555 /* We provide this callback for the SPDK channel code to create a channel using
556  * the channel struct we provided in our module get_io_channel() entry point. Here
557  * we get and save off an underlying base channel of the device below us so that
558  * we can communicate with the base bdev on a per channel basis.  If we needed
559  * our own poller for this vbdev, we'd register it here.
560  */
561 static int
562 delay_bdev_ch_create_cb(void *io_device, void *ctx_buf)
563 {
564 	struct delay_io_channel *delay_ch = ctx_buf;
565 	struct vbdev_delay *delay_node = io_device;
566 
567 	STAILQ_INIT(&delay_ch->avg_read_io);
568 	STAILQ_INIT(&delay_ch->p99_read_io);
569 	STAILQ_INIT(&delay_ch->avg_write_io);
570 	STAILQ_INIT(&delay_ch->p99_write_io);
571 
572 	delay_ch->io_poller = SPDK_POLLER_REGISTER(_delay_finish_io, delay_ch, 0);
573 	delay_ch->base_ch = spdk_bdev_get_io_channel(delay_node->base_desc);
574 	delay_ch->rand_seed = time(NULL);
575 
576 	return 0;
577 }
578 
579 /* We provide this callback for the SPDK channel code to destroy a channel
580  * created with our create callback. We just need to undo anything we did
581  * when we created. If this bdev used its own poller, we'd unregister it here.
582  */
583 static void
584 delay_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
585 {
586 	struct delay_io_channel *delay_ch = ctx_buf;
587 
588 	spdk_poller_unregister(&delay_ch->io_poller);
589 	spdk_put_io_channel(delay_ch->base_ch);
590 }
591 
592 /* Create the delay association from the bdev and vbdev name and insert
593  * on the global list. */
594 static int
595 vbdev_delay_insert_association(const char *bdev_name, const char *vbdev_name,
596 			       struct spdk_uuid *uuid,
597 			       uint64_t avg_read_latency, uint64_t p99_read_latency,
598 			       uint64_t avg_write_latency, uint64_t p99_write_latency)
599 {
600 	struct bdev_association *assoc;
601 
602 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
603 		if (strcmp(vbdev_name, assoc->vbdev_name) == 0) {
604 			SPDK_ERRLOG("delay bdev %s already exists\n", vbdev_name);
605 			return -EEXIST;
606 		}
607 	}
608 
609 	assoc = calloc(1, sizeof(struct bdev_association));
610 	if (!assoc) {
611 		SPDK_ERRLOG("could not allocate bdev_association\n");
612 		return -ENOMEM;
613 	}
614 
615 	assoc->bdev_name = strdup(bdev_name);
616 	if (!assoc->bdev_name) {
617 		SPDK_ERRLOG("could not allocate assoc->bdev_name\n");
618 		free(assoc);
619 		return -ENOMEM;
620 	}
621 
622 	assoc->vbdev_name = strdup(vbdev_name);
623 	if (!assoc->vbdev_name) {
624 		SPDK_ERRLOG("could not allocate assoc->vbdev_name\n");
625 		free(assoc->bdev_name);
626 		free(assoc);
627 		return -ENOMEM;
628 	}
629 
630 	assoc->avg_read_latency = avg_read_latency;
631 	assoc->p99_read_latency = p99_read_latency;
632 	assoc->avg_write_latency = avg_write_latency;
633 	assoc->p99_write_latency = p99_write_latency;
634 
635 	if (uuid) {
636 		spdk_uuid_copy(&assoc->uuid, uuid);
637 	}
638 
639 	TAILQ_INSERT_TAIL(&g_bdev_associations, assoc, link);
640 
641 	return 0;
642 }
643 
644 int
645 vbdev_delay_update_latency_value(char *delay_name, uint64_t latency_us, enum delay_io_type type)
646 {
647 	struct vbdev_delay *delay_node;
648 	uint64_t ticks_mhz = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
649 
650 	TAILQ_FOREACH(delay_node, &g_delay_nodes, link) {
651 		if (strcmp(delay_node->delay_bdev.name, delay_name) == 0) {
652 			break;
653 		}
654 	}
655 
656 	if (delay_node == NULL) {
657 		return -ENODEV;
658 	}
659 
660 	switch (type) {
661 	case DELAY_AVG_READ:
662 		delay_node->average_read_latency_ticks = ticks_mhz * latency_us;
663 		break;
664 	case DELAY_AVG_WRITE:
665 		delay_node->average_write_latency_ticks = ticks_mhz * latency_us;
666 		break;
667 	case DELAY_P99_READ:
668 		delay_node->p99_read_latency_ticks = ticks_mhz * latency_us;
669 		break;
670 	case DELAY_P99_WRITE:
671 		delay_node->p99_write_latency_ticks = ticks_mhz * latency_us;
672 		break;
673 	default:
674 		return -EINVAL;
675 	}
676 
677 	return 0;
678 }
679 
680 static int
681 vbdev_delay_init(void)
682 {
683 	/* Not allowing for .ini style configuration. */
684 	return 0;
685 }
686 
687 static void
688 vbdev_delay_finish(void)
689 {
690 	struct bdev_association *assoc;
691 
692 	while ((assoc = TAILQ_FIRST(&g_bdev_associations))) {
693 		TAILQ_REMOVE(&g_bdev_associations, assoc, link);
694 		free(assoc->bdev_name);
695 		free(assoc->vbdev_name);
696 		free(assoc);
697 	}
698 }
699 
700 static int
701 vbdev_delay_get_ctx_size(void)
702 {
703 	return sizeof(struct delay_bdev_io);
704 }
705 
706 static void
707 vbdev_delay_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
708 {
709 	/* No config per bdev needed */
710 }
711 
712 static int
713 vbdev_delay_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
714 {
715 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
716 
717 	/* Delay bdev doesn't work with data buffers, so it supports any memory domain used by base_bdev */
718 	return spdk_bdev_get_memory_domains(delay_node->base_bdev, domains, array_size);
719 }
720 
721 /* When we register our bdev this is how we specify our entry points. */
722 static const struct spdk_bdev_fn_table vbdev_delay_fn_table = {
723 	.destruct		= vbdev_delay_destruct,
724 	.submit_request		= vbdev_delay_submit_request,
725 	.io_type_supported	= vbdev_delay_io_type_supported,
726 	.get_io_channel		= vbdev_delay_get_io_channel,
727 	.dump_info_json		= vbdev_delay_dump_info_json,
728 	.write_config_json	= vbdev_delay_write_config_json,
729 	.get_memory_domains	= vbdev_delay_get_memory_domains,
730 };
731 
732 static void
733 vbdev_delay_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
734 {
735 	struct vbdev_delay *delay_node, *tmp;
736 
737 	TAILQ_FOREACH_SAFE(delay_node, &g_delay_nodes, link, tmp) {
738 		if (bdev_find == delay_node->base_bdev) {
739 			spdk_bdev_unregister(&delay_node->delay_bdev, NULL, NULL);
740 		}
741 	}
742 }
743 
744 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
745 static void
746 vbdev_delay_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
747 			       void *event_ctx)
748 {
749 	switch (type) {
750 	case SPDK_BDEV_EVENT_REMOVE:
751 		vbdev_delay_base_bdev_hotremove_cb(bdev);
752 		break;
753 	default:
754 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
755 		break;
756 	}
757 }
758 
759 /* Create and register the delay vbdev if we find it in our list of bdev names.
760  * This can be called either by the examine path or RPC method.
761  */
762 static int
763 vbdev_delay_register(const char *bdev_name)
764 {
765 	struct bdev_association *assoc;
766 	struct vbdev_delay *delay_node;
767 	struct spdk_bdev *bdev;
768 	uint64_t ticks_mhz = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
769 	int rc = 0;
770 
771 	/* Check our list of names from config versus this bdev and if
772 	 * there's a match, create the delay_node & bdev accordingly.
773 	 */
774 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
775 		if (strcmp(assoc->bdev_name, bdev_name) != 0) {
776 			continue;
777 		}
778 
779 		delay_node = calloc(1, sizeof(struct vbdev_delay));
780 		if (!delay_node) {
781 			rc = -ENOMEM;
782 			SPDK_ERRLOG("could not allocate delay_node\n");
783 			break;
784 		}
785 		delay_node->delay_bdev.name = strdup(assoc->vbdev_name);
786 		if (!delay_node->delay_bdev.name) {
787 			rc = -ENOMEM;
788 			SPDK_ERRLOG("could not allocate delay_bdev name\n");
789 			free(delay_node);
790 			break;
791 		}
792 		delay_node->delay_bdev.product_name = "delay";
793 
794 		spdk_uuid_copy(&delay_node->delay_bdev.uuid, &assoc->uuid);
795 
796 		/* The base bdev that we're attaching to. */
797 		rc = spdk_bdev_open_ext(bdev_name, true, vbdev_delay_base_bdev_event_cb,
798 					NULL, &delay_node->base_desc);
799 		if (rc) {
800 			if (rc != -ENODEV) {
801 				SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
802 			}
803 			free(delay_node->delay_bdev.name);
804 			free(delay_node);
805 			break;
806 		}
807 
808 		bdev = spdk_bdev_desc_get_bdev(delay_node->base_desc);
809 		delay_node->base_bdev = bdev;
810 
811 		delay_node->delay_bdev.write_cache = bdev->write_cache;
812 		delay_node->delay_bdev.required_alignment = bdev->required_alignment;
813 		delay_node->delay_bdev.optimal_io_boundary = bdev->optimal_io_boundary;
814 		delay_node->delay_bdev.blocklen = bdev->blocklen;
815 		delay_node->delay_bdev.blockcnt = bdev->blockcnt;
816 
817 		delay_node->delay_bdev.ctxt = delay_node;
818 		delay_node->delay_bdev.fn_table = &vbdev_delay_fn_table;
819 		delay_node->delay_bdev.module = &delay_if;
820 
821 		/* Store the number of ticks you need to add to get the I/O expiration time. */
822 		delay_node->average_read_latency_ticks = ticks_mhz * assoc->avg_read_latency;
823 		delay_node->p99_read_latency_ticks = ticks_mhz * assoc->p99_read_latency;
824 		delay_node->average_write_latency_ticks = ticks_mhz * assoc->avg_write_latency;
825 		delay_node->p99_write_latency_ticks = ticks_mhz * assoc->p99_write_latency;
826 
827 		spdk_io_device_register(delay_node, delay_bdev_ch_create_cb, delay_bdev_ch_destroy_cb,
828 					sizeof(struct delay_io_channel),
829 					assoc->vbdev_name);
830 
831 		/* Save the thread where the base device is opened */
832 		delay_node->thread = spdk_get_thread();
833 
834 		rc = spdk_bdev_module_claim_bdev(bdev, delay_node->base_desc, delay_node->delay_bdev.module);
835 		if (rc) {
836 			SPDK_ERRLOG("could not claim bdev %s\n", bdev_name);
837 			goto error_close;
838 		}
839 
840 		rc = spdk_bdev_register(&delay_node->delay_bdev);
841 		if (rc) {
842 			SPDK_ERRLOG("could not register delay_bdev\n");
843 			spdk_bdev_module_release_bdev(delay_node->base_bdev);
844 			goto error_close;
845 		}
846 
847 		TAILQ_INSERT_TAIL(&g_delay_nodes, delay_node, link);
848 	}
849 
850 	return rc;
851 
852 error_close:
853 	spdk_bdev_close(delay_node->base_desc);
854 	spdk_io_device_unregister(delay_node, NULL);
855 	free(delay_node->delay_bdev.name);
856 	free(delay_node);
857 	return rc;
858 }
859 
860 int
861 create_delay_disk(const char *bdev_name, const char *vbdev_name, struct spdk_uuid *uuid,
862 		  uint64_t avg_read_latency,
863 		  uint64_t p99_read_latency, uint64_t avg_write_latency, uint64_t p99_write_latency)
864 {
865 	int rc = 0;
866 
867 	if (p99_read_latency < avg_read_latency || p99_write_latency < avg_write_latency) {
868 		SPDK_ERRLOG("Unable to create a delay bdev where p99 latency is less than average latency.\n");
869 		return -EINVAL;
870 	}
871 
872 	rc = vbdev_delay_insert_association(bdev_name, vbdev_name, uuid, avg_read_latency, p99_read_latency,
873 					    avg_write_latency, p99_write_latency);
874 	if (rc) {
875 		return rc;
876 	}
877 
878 	rc = vbdev_delay_register(bdev_name);
879 	if (rc == -ENODEV) {
880 		/* This is not an error, we tracked the name above and it still
881 		 * may show up later.
882 		 */
883 		SPDK_NOTICELOG("vbdev creation deferred pending base bdev arrival\n");
884 		rc = 0;
885 	}
886 
887 	return rc;
888 }
889 
890 void
891 delete_delay_disk(const char *vbdev_name, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
892 {
893 	struct bdev_association *assoc;
894 	int rc;
895 
896 	rc = spdk_bdev_unregister_by_name(vbdev_name, &delay_if, cb_fn, cb_arg);
897 	if (rc == 0) {
898 		TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
899 			if (strcmp(assoc->vbdev_name, vbdev_name) == 0) {
900 				TAILQ_REMOVE(&g_bdev_associations, assoc, link);
901 				free(assoc->bdev_name);
902 				free(assoc->vbdev_name);
903 				free(assoc);
904 				break;
905 			}
906 		}
907 	} else {
908 		cb_fn(cb_arg, rc);
909 	}
910 }
911 
912 static void
913 vbdev_delay_examine(struct spdk_bdev *bdev)
914 {
915 	vbdev_delay_register(bdev->name);
916 
917 	spdk_bdev_module_examine_done(&delay_if);
918 }
919 
920 SPDK_LOG_REGISTER_COMPONENT(vbdev_delay)
921