xref: /spdk/lib/blob/request.c (revision c64ce716e459a0e90f3c81b7e58b814cb159a048)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "blobstore.h"
10 #include "request.h"
11 
12 #include "spdk/thread.h"
13 #include "spdk/queue.h"
14 
15 #include "spdk/log.h"
16 
17 void
18 bs_call_cpl(struct spdk_bs_cpl *cpl, int bserrno)
19 {
20 	switch (cpl->type) {
21 	case SPDK_BS_CPL_TYPE_BS_BASIC:
22 		cpl->u.bs_basic.cb_fn(cpl->u.bs_basic.cb_arg,
23 				      bserrno);
24 		break;
25 	case SPDK_BS_CPL_TYPE_BS_HANDLE:
26 		cpl->u.bs_handle.cb_fn(cpl->u.bs_handle.cb_arg,
27 				       bserrno == 0 ? cpl->u.bs_handle.bs : NULL,
28 				       bserrno);
29 		break;
30 	case SPDK_BS_CPL_TYPE_BLOB_BASIC:
31 		cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg,
32 					bserrno);
33 		break;
34 	case SPDK_BS_CPL_TYPE_BLOBID:
35 		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg,
36 				    bserrno == 0 ? cpl->u.blobid.blobid : SPDK_BLOBID_INVALID,
37 				    bserrno);
38 		break;
39 	case SPDK_BS_CPL_TYPE_BLOB_HANDLE:
40 		cpl->u.blob_handle.cb_fn(cpl->u.blob_handle.cb_arg,
41 					 bserrno == 0 ? cpl->u.blob_handle.blob : NULL,
42 					 bserrno);
43 		break;
44 	case SPDK_BS_CPL_TYPE_NESTED_SEQUENCE:
45 		cpl->u.nested_seq.cb_fn(cpl->u.nested_seq.cb_arg,
46 					cpl->u.nested_seq.parent,
47 					bserrno);
48 		break;
49 	case SPDK_BS_CPL_TYPE_NONE:
50 		/* this completion's callback is handled elsewhere */
51 		break;
52 	}
53 }
54 
55 static void
56 bs_request_set_complete(struct spdk_bs_request_set *set)
57 {
58 	struct spdk_bs_cpl cpl = set->cpl;
59 	int bserrno = set->bserrno;
60 
61 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
62 
63 	bs_call_cpl(&cpl, bserrno);
64 }
65 
66 static void
67 bs_sequence_completion(struct spdk_io_channel *channel, void *cb_arg, int bserrno)
68 {
69 	struct spdk_bs_request_set *set = cb_arg;
70 
71 	set->bserrno = bserrno;
72 	set->u.sequence.cb_fn((spdk_bs_sequence_t *)set, set->u.sequence.cb_arg, bserrno);
73 }
74 
75 static spdk_bs_sequence_t *
76 bs_sequence_start(struct spdk_io_channel *_channel,
77 		  struct spdk_bs_cpl *cpl)
78 {
79 	struct spdk_bs_channel		*channel;
80 	struct spdk_bs_request_set	*set;
81 
82 	channel = spdk_io_channel_get_ctx(_channel);
83 	assert(channel != NULL);
84 	set = TAILQ_FIRST(&channel->reqs);
85 	if (!set) {
86 		return NULL;
87 	}
88 	TAILQ_REMOVE(&channel->reqs, set, link);
89 
90 	set->cpl = *cpl;
91 	set->bserrno = 0;
92 	set->channel = channel;
93 	set->back_channel = _channel;
94 
95 	set->cb_args.cb_fn = bs_sequence_completion;
96 	set->cb_args.cb_arg = set;
97 	set->cb_args.channel = channel->dev_channel;
98 	set->ext_io_opts = NULL;
99 
100 	return (spdk_bs_sequence_t *)set;
101 }
102 
103 /* Use when performing IO directly on the blobstore (e.g. metadata - not a blob). */
104 spdk_bs_sequence_t *
105 bs_sequence_start_bs(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl)
106 {
107 	return bs_sequence_start(_channel, cpl);
108 }
109 
110 /* Use when performing IO on a blob. */
111 spdk_bs_sequence_t *
112 bs_sequence_start_blob(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
113 		       struct spdk_blob *blob)
114 {
115 	return bs_sequence_start(_channel, cpl);
116 }
117 
118 void
119 bs_sequence_read_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
120 			void *payload, uint64_t lba, uint32_t lba_count,
121 			spdk_bs_sequence_cpl cb_fn, void *cb_arg)
122 {
123 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)seq;
124 	struct spdk_io_channel		*back_channel = set->back_channel;
125 
126 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
127 		      lba);
128 
129 	set->u.sequence.cb_fn = cb_fn;
130 	set->u.sequence.cb_arg = cb_arg;
131 
132 	bs_dev->read(bs_dev, back_channel, payload, lba, lba_count, &set->cb_args);
133 }
134 
135 void
136 bs_sequence_read_dev(spdk_bs_sequence_t *seq, void *payload,
137 		     uint64_t lba, uint32_t lba_count,
138 		     spdk_bs_sequence_cpl cb_fn, void *cb_arg)
139 {
140 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
141 	struct spdk_bs_channel       *channel = set->channel;
142 
143 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
144 		      lba);
145 
146 	set->u.sequence.cb_fn = cb_fn;
147 	set->u.sequence.cb_arg = cb_arg;
148 
149 	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
150 }
151 
152 void
153 bs_sequence_write_dev(spdk_bs_sequence_t *seq, void *payload,
154 		      uint64_t lba, uint32_t lba_count,
155 		      spdk_bs_sequence_cpl cb_fn, void *cb_arg)
156 {
157 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
158 	struct spdk_bs_channel       *channel = set->channel;
159 
160 	SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
161 		      lba);
162 
163 	set->u.sequence.cb_fn = cb_fn;
164 	set->u.sequence.cb_arg = cb_arg;
165 
166 	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
167 			    &set->cb_args);
168 }
169 
170 void
171 bs_sequence_readv_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
172 			 struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
173 			 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
174 {
175 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
176 	struct spdk_io_channel		*back_channel = set->back_channel;
177 
178 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
179 		      lba);
180 
181 	set->u.sequence.cb_fn = cb_fn;
182 	set->u.sequence.cb_arg = cb_arg;
183 
184 	if (set->ext_io_opts) {
185 		assert(bs_dev->readv_ext);
186 		bs_dev->readv_ext(bs_dev, back_channel, iov, iovcnt, lba, lba_count,
187 				  &set->cb_args, set->ext_io_opts);
188 	} else {
189 		bs_dev->readv(bs_dev, back_channel, iov, iovcnt, lba, lba_count, &set->cb_args);
190 	}
191 }
192 
193 void
194 bs_sequence_readv_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
195 		      uint64_t lba, uint32_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
196 {
197 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
198 	struct spdk_bs_channel       *channel = set->channel;
199 
200 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
201 		      lba);
202 
203 	set->u.sequence.cb_fn = cb_fn;
204 	set->u.sequence.cb_arg = cb_arg;
205 	if (set->ext_io_opts) {
206 		assert(channel->dev->readv_ext);
207 		channel->dev->readv_ext(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
208 					&set->cb_args, set->ext_io_opts);
209 	} else {
210 		channel->dev->readv(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count, &set->cb_args);
211 	}
212 }
213 
214 void
215 bs_sequence_writev_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
216 		       uint64_t lba, uint32_t lba_count,
217 		       spdk_bs_sequence_cpl cb_fn, void *cb_arg)
218 {
219 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
220 	struct spdk_bs_channel       *channel = set->channel;
221 
222 	SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
223 		      lba);
224 
225 	set->u.sequence.cb_fn = cb_fn;
226 	set->u.sequence.cb_arg = cb_arg;
227 
228 	if (set->ext_io_opts) {
229 		assert(channel->dev->writev_ext);
230 		channel->dev->writev_ext(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
231 					 &set->cb_args, set->ext_io_opts);
232 	} else {
233 		channel->dev->writev(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
234 				     &set->cb_args);
235 	}
236 }
237 
238 void
239 bs_sequence_write_zeroes_dev(spdk_bs_sequence_t *seq,
240 			     uint64_t lba, uint64_t lba_count,
241 			     spdk_bs_sequence_cpl cb_fn, void *cb_arg)
242 {
243 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
244 	struct spdk_bs_channel       *channel = set->channel;
245 
246 	SPDK_DEBUGLOG(blob_rw, "writing zeroes to %" PRIu64 " blocks at LBA %" PRIu64 "\n",
247 		      lba_count, lba);
248 
249 	set->u.sequence.cb_fn = cb_fn;
250 	set->u.sequence.cb_arg = cb_arg;
251 
252 	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
253 				   &set->cb_args);
254 }
255 
256 void
257 bs_sequence_copy_dev(spdk_bs_sequence_t *seq, uint64_t dst_lba, uint64_t src_lba,
258 		     uint64_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
259 {
260 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq;
261 	struct spdk_bs_channel     *channel = set->channel;
262 
263 	SPDK_DEBUGLOG(blob_rw, "Copying %" PRIu64 " blocks from LBA %" PRIu64 " to LBA %" PRIu64 "\n",
264 		      lba_count, src_lba, dst_lba);
265 
266 	set->u.sequence.cb_fn = cb_fn;
267 	set->u.sequence.cb_arg = cb_arg;
268 
269 	channel->dev->copy(channel->dev, channel->dev_channel, dst_lba, src_lba, lba_count, &set->cb_args);
270 }
271 
272 void
273 bs_sequence_finish(spdk_bs_sequence_t *seq, int bserrno)
274 {
275 	if (bserrno != 0) {
276 		seq->bserrno = bserrno;
277 	}
278 	bs_request_set_complete((struct spdk_bs_request_set *)seq);
279 }
280 
281 void
282 bs_user_op_sequence_finish(void *cb_arg, int bserrno)
283 {
284 	spdk_bs_sequence_t *seq = cb_arg;
285 
286 	bs_sequence_finish(seq, bserrno);
287 }
288 
289 static void
290 bs_batch_completion(struct spdk_io_channel *_channel,
291 		    void *cb_arg, int bserrno)
292 {
293 	struct spdk_bs_request_set	*set = cb_arg;
294 
295 	set->u.batch.outstanding_ops--;
296 	if (bserrno != 0) {
297 		set->bserrno = bserrno;
298 	}
299 
300 	if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) {
301 		if (set->u.batch.cb_fn) {
302 			set->cb_args.cb_fn = bs_sequence_completion;
303 			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, bserrno);
304 		} else {
305 			bs_request_set_complete(set);
306 		}
307 	}
308 }
309 
310 spdk_bs_batch_t *
311 bs_batch_open(struct spdk_io_channel *_channel,
312 	      struct spdk_bs_cpl *cpl)
313 {
314 	struct spdk_bs_channel		*channel;
315 	struct spdk_bs_request_set	*set;
316 
317 	channel = spdk_io_channel_get_ctx(_channel);
318 	assert(channel != NULL);
319 	set = TAILQ_FIRST(&channel->reqs);
320 	if (!set) {
321 		return NULL;
322 	}
323 	TAILQ_REMOVE(&channel->reqs, set, link);
324 
325 	set->cpl = *cpl;
326 	set->bserrno = 0;
327 	set->channel = channel;
328 	set->back_channel = _channel;
329 
330 	set->u.batch.cb_fn = NULL;
331 	set->u.batch.cb_arg = NULL;
332 	set->u.batch.outstanding_ops = 0;
333 	set->u.batch.batch_closed = 0;
334 
335 	set->cb_args.cb_fn = bs_batch_completion;
336 	set->cb_args.cb_arg = set;
337 	set->cb_args.channel = channel->dev_channel;
338 
339 	return (spdk_bs_batch_t *)set;
340 }
341 
342 void
343 bs_batch_read_bs_dev(spdk_bs_batch_t *batch, struct spdk_bs_dev *bs_dev,
344 		     void *payload, uint64_t lba, uint32_t lba_count)
345 {
346 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
347 	struct spdk_io_channel		*back_channel = set->back_channel;
348 
349 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
350 		      lba);
351 
352 	set->u.batch.outstanding_ops++;
353 	bs_dev->read(bs_dev, back_channel, payload, lba, lba_count, &set->cb_args);
354 }
355 
356 void
357 bs_batch_read_dev(spdk_bs_batch_t *batch, void *payload,
358 		  uint64_t lba, uint32_t lba_count)
359 {
360 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
361 	struct spdk_bs_channel		*channel = set->channel;
362 
363 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
364 		      lba);
365 
366 	set->u.batch.outstanding_ops++;
367 	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
368 }
369 
370 void
371 bs_batch_write_dev(spdk_bs_batch_t *batch, void *payload,
372 		   uint64_t lba, uint32_t lba_count)
373 {
374 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
375 	struct spdk_bs_channel		*channel = set->channel;
376 
377 	SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks to LBA %" PRIu64 "\n", lba_count, lba);
378 
379 	set->u.batch.outstanding_ops++;
380 	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
381 			    &set->cb_args);
382 }
383 
384 void
385 bs_batch_unmap_dev(spdk_bs_batch_t *batch,
386 		   uint64_t lba, uint64_t lba_count)
387 {
388 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
389 	struct spdk_bs_channel		*channel = set->channel;
390 
391 	SPDK_DEBUGLOG(blob_rw, "Unmapping %" PRIu64 " blocks at LBA %" PRIu64 "\n", lba_count,
392 		      lba);
393 
394 	set->u.batch.outstanding_ops++;
395 	channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count,
396 			    &set->cb_args);
397 }
398 
399 void
400 bs_batch_write_zeroes_dev(spdk_bs_batch_t *batch,
401 			  uint64_t lba, uint64_t lba_count)
402 {
403 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
404 	struct spdk_bs_channel		*channel = set->channel;
405 
406 	SPDK_DEBUGLOG(blob_rw, "Zeroing %" PRIu64 " blocks at LBA %" PRIu64 "\n", lba_count, lba);
407 
408 	set->u.batch.outstanding_ops++;
409 	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
410 				   &set->cb_args);
411 }
412 
413 void
414 bs_batch_close(spdk_bs_batch_t *batch)
415 {
416 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
417 
418 	set->u.batch.batch_closed = 1;
419 
420 	if (set->u.batch.outstanding_ops == 0) {
421 		if (set->u.batch.cb_fn) {
422 			set->cb_args.cb_fn = bs_sequence_completion;
423 			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, set->bserrno);
424 		} else {
425 			bs_request_set_complete(set);
426 		}
427 	}
428 }
429 
430 spdk_bs_batch_t *
431 bs_sequence_to_batch(spdk_bs_sequence_t *seq, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
432 {
433 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq;
434 
435 	set->u.batch.cb_fn = cb_fn;
436 	set->u.batch.cb_arg = cb_arg;
437 	set->u.batch.outstanding_ops = 0;
438 	set->u.batch.batch_closed = 0;
439 
440 	set->cb_args.cb_fn = bs_batch_completion;
441 
442 	return set;
443 }
444 
445 spdk_bs_user_op_t *
446 bs_user_op_alloc(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
447 		 enum spdk_blob_op_type op_type, struct spdk_blob *blob,
448 		 void *payload, int iovcnt, uint64_t offset, uint64_t length)
449 {
450 	struct spdk_bs_channel		*channel;
451 	struct spdk_bs_request_set	*set;
452 	struct spdk_bs_user_op_args	*args;
453 
454 	channel = spdk_io_channel_get_ctx(_channel);
455 	assert(channel != NULL);
456 	set = TAILQ_FIRST(&channel->reqs);
457 	if (!set) {
458 		return NULL;
459 	}
460 	TAILQ_REMOVE(&channel->reqs, set, link);
461 
462 	set->cpl = *cpl;
463 	set->channel = channel;
464 	set->back_channel = NULL;
465 	set->ext_io_opts = NULL;
466 
467 	args = &set->u.user_op;
468 
469 	args->type = op_type;
470 	args->iovcnt = iovcnt;
471 	args->blob = blob;
472 	args->offset = offset;
473 	args->length = length;
474 	args->payload = payload;
475 
476 	return (spdk_bs_user_op_t *)set;
477 }
478 
479 void
480 bs_user_op_execute(spdk_bs_user_op_t *op)
481 {
482 	struct spdk_bs_request_set	*set;
483 	struct spdk_bs_user_op_args	*args;
484 	struct spdk_io_channel		*ch;
485 
486 	set = (struct spdk_bs_request_set *)op;
487 	args = &set->u.user_op;
488 	ch = spdk_io_channel_from_ctx(set->channel);
489 
490 	switch (args->type) {
491 	case SPDK_BLOB_READ:
492 		spdk_blob_io_read(args->blob, ch, args->payload, args->offset, args->length,
493 				  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
494 		break;
495 	case SPDK_BLOB_WRITE:
496 		spdk_blob_io_write(args->blob, ch, args->payload, args->offset, args->length,
497 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
498 		break;
499 	case SPDK_BLOB_UNMAP:
500 		spdk_blob_io_unmap(args->blob, ch, args->offset, args->length,
501 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
502 		break;
503 	case SPDK_BLOB_WRITE_ZEROES:
504 		spdk_blob_io_write_zeroes(args->blob, ch, args->offset, args->length,
505 					  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
506 		break;
507 	case SPDK_BLOB_READV:
508 		spdk_blob_io_readv_ext(args->blob, ch, args->payload, args->iovcnt,
509 				       args->offset, args->length,
510 				       set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg,
511 				       set->ext_io_opts);
512 		break;
513 	case SPDK_BLOB_WRITEV:
514 		spdk_blob_io_writev_ext(args->blob, ch, args->payload, args->iovcnt,
515 					args->offset, args->length,
516 					set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg,
517 					set->ext_io_opts);
518 		break;
519 	}
520 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
521 }
522 
523 void
524 bs_user_op_abort(spdk_bs_user_op_t *op, int bserrno)
525 {
526 	struct spdk_bs_request_set	*set;
527 
528 	set = (struct spdk_bs_request_set *)op;
529 
530 	set->cpl.u.blob_basic.cb_fn(set->cpl.u.blob_basic.cb_arg, bserrno);
531 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
532 }
533 
534 SPDK_LOG_REGISTER_COMPONENT(blob_rw)
535