xref: /spdk/lib/blob/request.c (revision 22898a91b9b6f289933db19b0175821cfb7e7820)
1  /*-
2   *   BSD LICENSE
3   *
4   *   Copyright (c) Intel Corporation.
5   *   All rights reserved.
6   *
7   *   Redistribution and use in source and binary forms, with or without
8   *   modification, are permitted provided that the following conditions
9   *   are met:
10   *
11   *     * Redistributions of source code must retain the above copyright
12   *       notice, this list of conditions and the following disclaimer.
13   *     * Redistributions in binary form must reproduce the above copyright
14   *       notice, this list of conditions and the following disclaimer in
15   *       the documentation and/or other materials provided with the
16   *       distribution.
17   *     * Neither the name of Intel Corporation nor the names of its
18   *       contributors may be used to endorse or promote products derived
19   *       from this software without specific prior written permission.
20   *
21   *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22   *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23   *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24   *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25   *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26   *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27   *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28   *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29   *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30   *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31   *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32   */
33  
34  #include "spdk/stdinc.h"
35  
36  #include "blobstore.h"
37  #include "request.h"
38  
39  #include "spdk/io_channel.h"
40  #include "spdk/queue.h"
41  
42  #include "spdk_internal/log.h"
43  
44  void
45  spdk_bs_call_cpl(struct spdk_bs_cpl *cpl, int bserrno)
46  {
47  	switch (cpl->type) {
48  	case SPDK_BS_CPL_TYPE_BS_BASIC:
49  		cpl->u.bs_basic.cb_fn(cpl->u.bs_basic.cb_arg,
50  				      bserrno);
51  		break;
52  	case SPDK_BS_CPL_TYPE_BS_HANDLE:
53  		cpl->u.bs_handle.cb_fn(cpl->u.bs_handle.cb_arg,
54  				       cpl->u.bs_handle.bs,
55  				       bserrno);
56  		break;
57  	case SPDK_BS_CPL_TYPE_BLOB_BASIC:
58  		cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg,
59  					bserrno);
60  		break;
61  	case SPDK_BS_CPL_TYPE_BLOBID:
62  		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg,
63  				    cpl->u.blobid.blobid,
64  				    bserrno);
65  		break;
66  	case SPDK_BS_CPL_TYPE_BLOB_HANDLE:
67  		cpl->u.blob_handle.cb_fn(cpl->u.blob_handle.cb_arg,
68  					 cpl->u.blob_handle.blob,
69  					 bserrno);
70  		break;
71  	case SPDK_BS_CPL_TYPE_NESTED_SEQUENCE:
72  		cpl->u.nested_seq.cb_fn(cpl->u.nested_seq.cb_arg,
73  					cpl->u.nested_seq.parent,
74  					bserrno);
75  		break;
76  	case SPDK_BS_CPL_TYPE_NONE:
77  		/* this completion's callback is handled elsewhere */
78  		break;
79  	}
80  }
81  
82  static void
83  spdk_bs_request_set_complete(struct spdk_bs_request_set *set)
84  {
85  	struct spdk_bs_cpl cpl = set->cpl;
86  	int bserrno = set->bserrno;
87  
88  	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
89  
90  	spdk_bs_call_cpl(&cpl, bserrno);
91  }
92  
93  static void
94  spdk_bs_sequence_completion(struct spdk_io_channel *channel, void *cb_arg, int bserrno)
95  {
96  	struct spdk_bs_request_set *set = cb_arg;
97  
98  	set->bserrno = bserrno;
99  	set->u.sequence.cb_fn((spdk_bs_sequence_t *)set, set->u.sequence.cb_arg, bserrno);
100  }
101  
102  spdk_bs_sequence_t *
103  spdk_bs_sequence_start(struct spdk_io_channel *_channel,
104  		       struct spdk_bs_cpl *cpl)
105  {
106  	struct spdk_bs_channel		*channel;
107  	struct spdk_bs_request_set	*set;
108  
109  	channel = spdk_io_channel_get_ctx(_channel);
110  
111  	set = TAILQ_FIRST(&channel->reqs);
112  	if (!set) {
113  		return NULL;
114  	}
115  	TAILQ_REMOVE(&channel->reqs, set, link);
116  
117  	set->cpl = *cpl;
118  	set->bserrno = 0;
119  	set->channel = channel;
120  
121  	set->cb_args.cb_fn = spdk_bs_sequence_completion;
122  	set->cb_args.cb_arg = set;
123  	set->cb_args.channel = channel->dev_channel;
124  
125  	return (spdk_bs_sequence_t *)set;
126  }
127  
128  void
129  spdk_bs_sequence_read_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
130  			     void *payload, uint64_t lba, uint32_t lba_count,
131  			     spdk_bs_sequence_cpl cb_fn, void *cb_arg)
132  {
133  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
134  	struct spdk_bs_channel       *channel = set->channel;
135  
136  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
137  		      lba);
138  
139  	set->u.sequence.cb_fn = cb_fn;
140  	set->u.sequence.cb_arg = cb_arg;
141  
142  	bs_dev->read(bs_dev, spdk_io_channel_from_ctx(channel), payload, lba, lba_count, &set->cb_args);
143  }
144  
145  void
146  spdk_bs_sequence_read_dev(spdk_bs_sequence_t *seq, void *payload,
147  			  uint64_t lba, uint32_t lba_count,
148  			  spdk_bs_sequence_cpl cb_fn, void *cb_arg)
149  {
150  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
151  	struct spdk_bs_channel       *channel = set->channel;
152  
153  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
154  		      lba);
155  
156  	set->u.sequence.cb_fn = cb_fn;
157  	set->u.sequence.cb_arg = cb_arg;
158  
159  	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
160  }
161  
162  void
163  spdk_bs_sequence_write_dev(spdk_bs_sequence_t *seq, void *payload,
164  			   uint64_t lba, uint32_t lba_count,
165  			   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
166  {
167  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
168  	struct spdk_bs_channel       *channel = set->channel;
169  
170  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
171  		      lba);
172  
173  	set->u.sequence.cb_fn = cb_fn;
174  	set->u.sequence.cb_arg = cb_arg;
175  
176  	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
177  			    &set->cb_args);
178  }
179  
180  void
181  spdk_bs_sequence_readv_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
182  			      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
183  			      spdk_bs_sequence_cpl cb_fn, void *cb_arg)
184  {
185  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
186  	struct spdk_bs_channel       *channel = set->channel;
187  
188  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
189  		      lba);
190  
191  	set->u.sequence.cb_fn = cb_fn;
192  	set->u.sequence.cb_arg = cb_arg;
193  
194  	bs_dev->readv(bs_dev, spdk_io_channel_from_ctx(channel), iov, iovcnt, lba, lba_count,
195  		      &set->cb_args);
196  }
197  
198  void
199  spdk_bs_sequence_readv_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
200  			   uint64_t lba, uint32_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
201  {
202  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
203  	struct spdk_bs_channel       *channel = set->channel;
204  
205  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
206  		      lba);
207  
208  	set->u.sequence.cb_fn = cb_fn;
209  	set->u.sequence.cb_arg = cb_arg;
210  	channel->dev->readv(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
211  			    &set->cb_args);
212  }
213  
214  void
215  spdk_bs_sequence_writev_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
216  			    uint64_t lba, uint32_t lba_count,
217  			    spdk_bs_sequence_cpl cb_fn, void *cb_arg)
218  {
219  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
220  	struct spdk_bs_channel       *channel = set->channel;
221  
222  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
223  		      lba);
224  
225  	set->u.sequence.cb_fn = cb_fn;
226  	set->u.sequence.cb_arg = cb_arg;
227  
228  	channel->dev->writev(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
229  			     &set->cb_args);
230  }
231  
232  void
233  spdk_bs_sequence_unmap_dev(spdk_bs_sequence_t *seq,
234  			   uint64_t lba, uint32_t lba_count,
235  			   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
236  {
237  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
238  	struct spdk_bs_channel       *channel = set->channel;
239  
240  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Unmapping %" PRIu32 " blocks at LBA %" PRIu64 "\n", lba_count,
241  		      lba);
242  
243  	set->u.sequence.cb_fn = cb_fn;
244  	set->u.sequence.cb_arg = cb_arg;
245  
246  	channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count,
247  			    &set->cb_args);
248  }
249  
250  void
251  spdk_bs_sequence_write_zeroes_dev(spdk_bs_sequence_t *seq,
252  				  uint64_t lba, uint32_t lba_count,
253  				  spdk_bs_sequence_cpl cb_fn, void *cb_arg)
254  {
255  	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
256  	struct spdk_bs_channel       *channel = set->channel;
257  
258  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "writing zeroes to %" PRIu32 " blocks at LBA %" PRIu64 "\n",
259  		      lba_count, lba);
260  
261  	set->u.sequence.cb_fn = cb_fn;
262  	set->u.sequence.cb_arg = cb_arg;
263  
264  	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
265  				   &set->cb_args);
266  }
267  
268  void
269  spdk_bs_sequence_finish(spdk_bs_sequence_t *seq, int bserrno)
270  {
271  	if (bserrno != 0) {
272  		seq->bserrno = bserrno;
273  	}
274  	spdk_bs_request_set_complete((struct spdk_bs_request_set *)seq);
275  }
276  
277  void
278  spdk_bs_user_op_sequence_finish(void *cb_arg, int bserrno)
279  {
280  	spdk_bs_sequence_t *seq = cb_arg;
281  
282  	spdk_bs_sequence_finish(seq, bserrno);
283  }
284  
285  static void
286  spdk_bs_batch_completion(struct spdk_io_channel *_channel,
287  			 void *cb_arg, int bserrno)
288  {
289  	struct spdk_bs_request_set	*set = cb_arg;
290  
291  	set->u.batch.outstanding_ops--;
292  	if (bserrno != 0) {
293  		set->bserrno = bserrno;
294  	}
295  
296  	if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) {
297  		if (set->u.batch.cb_fn) {
298  			set->cb_args.cb_fn = spdk_bs_sequence_completion;
299  			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, bserrno);
300  		} else {
301  			spdk_bs_request_set_complete(set);
302  		}
303  	}
304  }
305  
306  spdk_bs_batch_t *
307  spdk_bs_batch_open(struct spdk_io_channel *_channel,
308  		   struct spdk_bs_cpl *cpl)
309  {
310  	struct spdk_bs_channel		*channel;
311  	struct spdk_bs_request_set	*set;
312  
313  	channel = spdk_io_channel_get_ctx(_channel);
314  
315  	set = TAILQ_FIRST(&channel->reqs);
316  	if (!set) {
317  		return NULL;
318  	}
319  	TAILQ_REMOVE(&channel->reqs, set, link);
320  
321  	set->cpl = *cpl;
322  	set->bserrno = 0;
323  	set->channel = channel;
324  
325  	set->u.batch.cb_fn = NULL;
326  	set->u.batch.cb_arg = NULL;
327  	set->u.batch.outstanding_ops = 0;
328  	set->u.batch.batch_closed = 0;
329  
330  	set->cb_args.cb_fn = spdk_bs_batch_completion;
331  	set->cb_args.cb_arg = set;
332  	set->cb_args.channel = channel->dev_channel;
333  
334  	return (spdk_bs_batch_t *)set;
335  }
336  
337  void
338  spdk_bs_batch_read_bs_dev(spdk_bs_batch_t *batch, struct spdk_bs_dev *bs_dev,
339  			  void *payload, uint64_t lba, uint32_t lba_count)
340  {
341  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
342  	struct spdk_bs_channel		*channel = set->channel;
343  
344  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
345  		      lba);
346  
347  	set->u.batch.outstanding_ops++;
348  	bs_dev->read(bs_dev, spdk_io_channel_from_ctx(channel), payload, lba, lba_count, &set->cb_args);
349  }
350  
351  void
352  spdk_bs_batch_read_dev(spdk_bs_batch_t *batch, void *payload,
353  		       uint64_t lba, uint32_t lba_count)
354  {
355  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
356  	struct spdk_bs_channel		*channel = set->channel;
357  
358  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
359  		      lba);
360  
361  	set->u.batch.outstanding_ops++;
362  	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
363  }
364  
365  void
366  spdk_bs_batch_write_dev(spdk_bs_batch_t *batch, void *payload,
367  			uint64_t lba, uint32_t lba_count)
368  {
369  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
370  	struct spdk_bs_channel		*channel = set->channel;
371  
372  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu32 " blocks to LBA %" PRIu64 "\n", lba_count, lba);
373  
374  	set->u.batch.outstanding_ops++;
375  	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
376  			    &set->cb_args);
377  }
378  
379  void
380  spdk_bs_batch_unmap_dev(spdk_bs_batch_t *batch,
381  			uint64_t lba, uint32_t lba_count)
382  {
383  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
384  	struct spdk_bs_channel		*channel = set->channel;
385  
386  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Unmapping %" PRIu32 " blocks at LBA %" PRIu64 "\n", lba_count,
387  		      lba);
388  
389  	set->u.batch.outstanding_ops++;
390  	channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count,
391  			    &set->cb_args);
392  }
393  
394  void
395  spdk_bs_batch_write_zeroes_dev(spdk_bs_batch_t *batch,
396  			       uint64_t lba, uint32_t lba_count)
397  {
398  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
399  	struct spdk_bs_channel		*channel = set->channel;
400  
401  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Zeroing %" PRIu32 " blocks at LBA %" PRIu64 "\n", lba_count, lba);
402  
403  	set->u.batch.outstanding_ops++;
404  	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
405  				   &set->cb_args);
406  }
407  
408  static void
409  spdk_bs_batch_blob_op_complete(void *arg, int bserrno)
410  {
411  	/* TODO: spdk_bs_batch_completion does not actually use the channel parameter -
412  	 *  just pass NULL here instead of getting the channel from the set cb_arg.
413  	 */
414  	spdk_bs_batch_completion(NULL, arg, bserrno);
415  }
416  
417  void
418  spdk_bs_batch_read_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
419  			void *payload, uint64_t offset, uint64_t length)
420  {
421  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
422  	struct spdk_bs_channel		*channel = set->channel;
423  
424  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu64 " pages from offset %" PRIu64 "\n", length,
425  		      offset);
426  
427  	set->u.batch.outstanding_ops++;
428  	spdk_blob_io_read(blob, spdk_io_channel_from_ctx(channel), payload, offset,
429  			  length, spdk_bs_batch_blob_op_complete, set);
430  }
431  
432  void
433  spdk_bs_batch_write_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
434  			 void *payload, uint64_t offset, uint64_t length)
435  {
436  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
437  	struct spdk_bs_channel		*channel = set->channel;
438  
439  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu64 " pages from offset %" PRIu64 "\n", length,
440  		      offset);
441  
442  	set->u.batch.outstanding_ops++;
443  	spdk_blob_io_write(blob, spdk_io_channel_from_ctx(channel), payload, offset,
444  			   length, spdk_bs_batch_blob_op_complete, set);
445  }
446  
447  void
448  spdk_bs_batch_unmap_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
449  			 uint64_t offset, uint64_t length)
450  {
451  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
452  	struct spdk_bs_channel		*channel = set->channel;
453  
454  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Unmapping %" PRIu64 " pages from offset %" PRIu64 "\n", length,
455  		      offset);
456  
457  	set->u.batch.outstanding_ops++;
458  	spdk_blob_io_unmap(blob, spdk_io_channel_from_ctx(channel), offset, length,
459  			   spdk_bs_batch_blob_op_complete, set);
460  }
461  
462  void
463  spdk_bs_batch_write_zeroes_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
464  				uint64_t offset, uint64_t length)
465  {
466  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
467  	struct spdk_bs_channel		*channel = set->channel;
468  
469  	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Zeroing %" PRIu64 " pages from offset %" PRIu64 "\n", length,
470  		      offset);
471  
472  	set->u.batch.outstanding_ops++;
473  	spdk_blob_io_write_zeroes(blob, spdk_io_channel_from_ctx(channel), offset, length,
474  				  spdk_bs_batch_blob_op_complete, set);
475  }
476  
477  void
478  spdk_bs_batch_set_errno(spdk_bs_batch_t *batch, int bserrno)
479  {
480  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
481  
482  	set->bserrno = bserrno;
483  }
484  
485  void
486  spdk_bs_batch_close(spdk_bs_batch_t *batch)
487  {
488  	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
489  
490  	set->u.batch.batch_closed = 1;
491  
492  	if (set->u.batch.outstanding_ops == 0) {
493  		if (set->u.batch.cb_fn) {
494  			set->cb_args.cb_fn = spdk_bs_sequence_completion;
495  			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, set->bserrno);
496  		} else {
497  			spdk_bs_request_set_complete(set);
498  		}
499  	}
500  }
501  
502  spdk_bs_batch_t *
503  spdk_bs_sequence_to_batch(spdk_bs_sequence_t *seq, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
504  {
505  	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq;
506  
507  	set->u.batch.cb_fn = cb_fn;
508  	set->u.batch.cb_arg = cb_arg;
509  	set->u.batch.outstanding_ops = 0;
510  	set->u.batch.batch_closed = 0;
511  
512  	set->cb_args.cb_fn = spdk_bs_batch_completion;
513  
514  	return set;
515  }
516  
517  spdk_bs_sequence_t *
518  spdk_bs_batch_to_sequence(spdk_bs_batch_t *batch)
519  {
520  	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch;
521  
522  	set->u.batch.outstanding_ops++;
523  
524  	set->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
525  	set->cpl.u.blob_basic.cb_fn = spdk_bs_sequence_to_batch_completion;
526  	set->cpl.u.blob_basic.cb_arg = set;
527  	set->bserrno = 0;
528  
529  	set->cb_args.cb_fn = spdk_bs_sequence_completion;
530  	set->cb_args.cb_arg = set;
531  	set->cb_args.channel = set->channel->dev_channel;
532  
533  	return (spdk_bs_sequence_t *)set;
534  }
535  
536  spdk_bs_user_op_t *
537  spdk_bs_user_op_alloc(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
538  		      enum spdk_blob_op_type op_type, struct spdk_blob *blob,
539  		      void *payload, int iovcnt, uint64_t offset, uint64_t length)
540  {
541  	struct spdk_bs_channel		*channel;
542  	struct spdk_bs_request_set	*set;
543  	struct spdk_bs_user_op_args	*args;
544  
545  	channel = spdk_io_channel_get_ctx(_channel);
546  
547  	set = TAILQ_FIRST(&channel->reqs);
548  	if (!set) {
549  		return NULL;
550  	}
551  	TAILQ_REMOVE(&channel->reqs, set, link);
552  
553  	set->cpl = *cpl;
554  	set->channel = channel;
555  
556  	args = &set->u.user_op;
557  
558  	args->type = op_type;
559  	args->iovcnt = iovcnt;
560  	args->blob = blob;
561  	args->offset = offset;
562  	args->length = length;
563  	args->payload = payload;
564  
565  	return (spdk_bs_user_op_t *)set;
566  }
567  
568  void
569  spdk_bs_user_op_execute(spdk_bs_user_op_t *op)
570  {
571  	struct spdk_bs_request_set	*set;
572  	struct spdk_bs_user_op_args	*args;
573  	struct spdk_io_channel		*ch;
574  
575  	set = (struct spdk_bs_request_set *)op;
576  	args = &set->u.user_op;
577  	ch = spdk_io_channel_from_ctx(set->channel);
578  
579  	switch (args->type) {
580  	case SPDK_BLOB_READ:
581  		spdk_blob_io_read(args->blob, ch, args->payload, args->offset, args->length,
582  				  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
583  		break;
584  	case SPDK_BLOB_WRITE:
585  		spdk_blob_io_write(args->blob, ch, args->payload, args->offset, args->length,
586  				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
587  		break;
588  	case SPDK_BLOB_UNMAP:
589  		spdk_blob_io_unmap(args->blob, ch, args->offset, args->length,
590  				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
591  		break;
592  	case SPDK_BLOB_WRITE_ZEROES:
593  		spdk_blob_io_write_zeroes(args->blob, ch, args->offset, args->length,
594  					  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
595  		break;
596  	case SPDK_BLOB_READV:
597  		spdk_blob_io_readv(args->blob, ch, args->payload, args->iovcnt,
598  				   args->offset, args->length,
599  				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
600  		break;
601  	case SPDK_BLOB_WRITEV:
602  		spdk_blob_io_writev(args->blob, ch, args->payload, args->iovcnt,
603  				    args->offset, args->length,
604  				    set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
605  		break;
606  	}
607  	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
608  }
609  
610  void
611  spdk_bs_user_op_abort(spdk_bs_user_op_t *op)
612  {
613  	struct spdk_bs_request_set	*set;
614  
615  	set = (struct spdk_bs_request_set *)op;
616  
617  	set->cpl.u.blob_basic.cb_fn(set->cpl.u.blob_basic.cb_arg, -EIO);
618  	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
619  }
620  
621  void
622  spdk_bs_sequence_to_batch_completion(void *cb_arg, int bserrno)
623  {
624  	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)cb_arg;
625  
626  	set->u.batch.outstanding_ops--;
627  
628  	if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) {
629  		if (set->cb_args.cb_fn) {
630  			set->cb_args.cb_fn(set->cb_args.channel, set->cb_args.cb_arg, bserrno);
631  		}
632  	}
633  }
634  
635  SPDK_LOG_REGISTER_COMPONENT("blob_rw", SPDK_LOG_BLOB_RW)
636