xref: /spdk/module/bdev/daos/bdev_daos.c (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) croit GmbH.
3  *   All rights reserved.
4  */
5 
6 #include <sys/queue.h>
7 
8 #include "spdk/bdev.h"
9 #include "spdk/bdev_module.h"
10 #include "spdk/endian.h"
11 #include "spdk/env.h"
12 #include "spdk/json.h"
13 #include "spdk/thread.h"
14 #include "spdk/queue.h"
15 #include "spdk/string.h"
16 #include "spdk/stdinc.h"
17 #include "spdk/log.h"
18 
19 #include <daos.h>
20 #include <daos_event.h>
21 #include <daos_fs.h>
22 #include <daos_types.h>
23 #include <daos_pool.h>
24 #include <daos_cont.h>
25 #include <daos_errno.h>
26 
27 #include "bdev_daos.h"
28 
29 #define BDEV_DAOS_IOVECS_MAX 32
30 
31 struct bdev_daos_task {
32 	daos_event_t ev;
33 	struct spdk_thread *submit_td;
34 	struct spdk_bdev_io *bdev_io;
35 
36 	enum spdk_bdev_io_status status;
37 
38 	uint64_t offset;
39 
40 	/* DAOS version of iovec and scatter/gather */
41 	daos_size_t read_size;
42 	d_iov_t diovs[BDEV_DAOS_IOVECS_MAX];
43 	d_sg_list_t sgl;
44 };
45 
46 struct bdev_daos {
47 	struct spdk_bdev disk;
48 	daos_oclass_id_t oclass;
49 
50 	char pool_name[DAOS_PROP_MAX_LABEL_BUF_LEN];
51 	char cont_name[DAOS_PROP_MAX_LABEL_BUF_LEN];
52 
53 	struct bdev_daos_task *reset_task;
54 	struct spdk_poller    *reset_retry_timer;
55 };
56 
57 struct bdev_daos_io_channel {
58 	struct bdev_daos *disk;
59 	struct spdk_poller *poller;
60 
61 	daos_handle_t pool;
62 	daos_handle_t cont;
63 
64 	dfs_t *dfs;
65 	dfs_obj_t *obj;
66 	daos_handle_t queue;
67 };
68 
69 static uint32_t g_bdev_daos_init_count = 0;
70 static pthread_mutex_t g_bdev_daos_init_mutex = PTHREAD_MUTEX_INITIALIZER;
71 
72 static int bdev_daos_initialize(void);
73 
74 static int bdev_get_daos_engine(void);
75 static int bdev_daos_put_engine(void);
76 
77 static int
78 bdev_daos_get_ctx_size(void)
79 {
80 	return sizeof(struct bdev_daos_task);
81 }
82 
83 static struct spdk_bdev_module daos_if = {
84 	.name = "daos",
85 	.module_init = bdev_daos_initialize,
86 	.get_ctx_size = bdev_daos_get_ctx_size,
87 };
88 
89 SPDK_BDEV_MODULE_REGISTER(daos, &daos_if)
90 
91 static void
92 bdev_daos_free(struct bdev_daos *bdev_daos)
93 {
94 	if (!bdev_daos) {
95 		return;
96 	}
97 
98 	free(bdev_daos->disk.name);
99 	free(bdev_daos);
100 }
101 
102 static void
103 bdev_daos_destruct_cb(void *io_device)
104 {
105 	int rc;
106 	struct bdev_daos *daos = io_device;
107 
108 	assert(daos != NULL);
109 
110 	bdev_daos_free(daos);
111 
112 	rc = bdev_daos_put_engine();
113 	if (rc) {
114 		SPDK_ERRLOG("could not de-initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
115 	}
116 }
117 
118 static int
119 bdev_daos_destruct(void *ctx)
120 {
121 	struct bdev_daos *daos = ctx;
122 
123 	SPDK_NOTICELOG("%s: destroying bdev_daos device\n", daos->disk.name);
124 
125 	spdk_io_device_unregister(daos, bdev_daos_destruct_cb);
126 
127 	return 0;
128 }
129 
130 static void
131 _bdev_daos_io_complete(void *bdev_daos_task)
132 {
133 	struct bdev_daos_task *task = bdev_daos_task;
134 
135 	SPDK_DEBUGLOG(bdev_daos, "completed IO at %#lx with status %s\n", task->offset,
136 		      task->status == SPDK_BDEV_IO_STATUS_SUCCESS ? "SUCCESS" : "FAILURE");
137 
138 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(task), task->status);
139 }
140 
141 static void
142 bdev_daos_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
143 {
144 	struct bdev_daos_task *task = (struct bdev_daos_task *)bdev_io->driver_ctx;
145 	struct spdk_thread *current_thread = spdk_get_thread();
146 
147 	assert(task->submit_td != NULL);
148 
149 	task->status = status;
150 	if (task->submit_td != current_thread) {
151 		spdk_thread_send_msg(task->submit_td, _bdev_daos_io_complete, task);
152 	} else {
153 		_bdev_daos_io_complete(task);
154 	}
155 }
156 
157 static int64_t
158 bdev_daos_writev(struct bdev_daos *daos, struct bdev_daos_io_channel *ch,
159 		 struct bdev_daos_task *task,
160 		 struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
161 {
162 	int rc;
163 	struct iovec *io = iov;
164 
165 	SPDK_DEBUGLOG(bdev_daos, "write %d iovs size %lu to off: %#lx\n",
166 		      iovcnt, nbytes, offset);
167 
168 	assert(ch != NULL);
169 	assert(daos != NULL);
170 	assert(task != NULL);
171 	assert(iov != NULL);
172 
173 	if (iovcnt > BDEV_DAOS_IOVECS_MAX) {
174 		SPDK_ERRLOG("iovs number [%d] exceeds max allowed limit [%d]\n", iovcnt,
175 			    BDEV_DAOS_IOVECS_MAX);
176 		return -E2BIG;
177 	}
178 
179 	if ((rc = daos_event_init(&task->ev, ch->queue, NULL))) {
180 		SPDK_ERRLOG("%s: could not initialize async event: " DF_RC "\n",
181 			    daos->disk.name, DP_RC(rc));
182 		return -EINVAL;
183 	}
184 
185 	for (int i = 0; i < iovcnt; i++, iov++) {
186 		d_iov_set(&(task->diovs[i]), io->iov_base, io->iov_len);
187 	}
188 
189 	task->sgl.sg_nr = iovcnt;
190 	task->sgl.sg_nr_out = 0;
191 	task->sgl.sg_iovs = task->diovs;
192 	task->offset = offset;
193 
194 	if ((rc = dfs_write(ch->dfs, ch->obj, &task->sgl, offset, &task->ev))) {
195 		SPDK_ERRLOG("%s: could not start async write: " DF_RC "\n",
196 			    daos->disk.name, DP_RC(rc));
197 		daos_event_fini(&task->ev);
198 		return -EINVAL;
199 	}
200 
201 	return nbytes;
202 }
203 
204 static int64_t
205 bdev_daos_readv(struct bdev_daos *daos, struct bdev_daos_io_channel *ch,
206 		struct bdev_daos_task *task,
207 		struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
208 {
209 	int rc;
210 	struct iovec *io = iov;
211 
212 	SPDK_DEBUGLOG(bdev_daos, "read %d iovs size %lu to off: %#lx\n",
213 		      iovcnt, nbytes, offset);
214 
215 	assert(ch != NULL);
216 	assert(daos != NULL);
217 	assert(task != NULL);
218 	assert(iov != NULL);
219 
220 	if (iovcnt > BDEV_DAOS_IOVECS_MAX) {
221 		SPDK_ERRLOG("iovs number [%d] exceeds max allowed limit [%d]\n", iovcnt,
222 			    BDEV_DAOS_IOVECS_MAX);
223 		return -E2BIG;
224 	}
225 
226 	if ((rc = daos_event_init(&task->ev, ch->queue, NULL))) {
227 		SPDK_ERRLOG("%s: could not initialize async event: " DF_RC "\n",
228 			    daos->disk.name, DP_RC(rc));
229 		return -EINVAL;
230 	}
231 
232 	for (int i = 0; i < iovcnt; i++, io++) {
233 		d_iov_set(&(task->diovs[i]), io->iov_base, io->iov_len);
234 	}
235 
236 	task->sgl.sg_nr = iovcnt;
237 	task->sgl.sg_nr_out = 0;
238 	task->sgl.sg_iovs = task->diovs;
239 	task->offset = offset;
240 
241 	if ((rc = dfs_read(ch->dfs, ch->obj, &task->sgl, offset, &task->read_size, &task->ev))) {
242 		SPDK_ERRLOG("%s: could not start async read: " DF_RC "\n",
243 			    daos->disk.name, DP_RC(rc));
244 		daos_event_fini(&task->ev);
245 		return -EINVAL;
246 	}
247 
248 	return nbytes;
249 }
250 
251 static void
252 bdev_daos_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
253 		     bool success)
254 {
255 	int64_t rc;
256 	struct bdev_daos_io_channel *dch = spdk_io_channel_get_ctx(ch);
257 
258 	if (!success) {
259 		bdev_daos_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
260 		return;
261 	}
262 
263 	rc = bdev_daos_readv((struct bdev_daos *)bdev_io->bdev->ctxt,
264 			     dch,
265 			     (struct bdev_daos_task *)bdev_io->driver_ctx,
266 			     bdev_io->u.bdev.iovs,
267 			     bdev_io->u.bdev.iovcnt,
268 			     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
269 			     bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
270 
271 	if (rc < 0) {
272 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
273 		return;
274 	}
275 }
276 
277 static void
278 _bdev_daos_get_io_inflight(struct spdk_io_channel_iter *i)
279 {
280 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
281 	struct bdev_daos_io_channel *dch = spdk_io_channel_get_ctx(ch);
282 	int io_inflight = daos_eq_query(dch->queue, DAOS_EQR_WAITING, 0, NULL);
283 
284 	if (io_inflight > 0) {
285 		spdk_for_each_channel_continue(i, -1);
286 		return;
287 	}
288 
289 	spdk_for_each_channel_continue(i, 0);
290 }
291 
292 static int bdev_daos_reset_retry_timer(void *arg);
293 
294 static void
295 _bdev_daos_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
296 {
297 	struct bdev_daos *daos = spdk_io_channel_iter_get_ctx(i);
298 
299 	if (status == -1) {
300 		daos->reset_retry_timer = SPDK_POLLER_REGISTER(bdev_daos_reset_retry_timer, daos, 1000);
301 		return;
302 	}
303 
304 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(daos->reset_task), SPDK_BDEV_IO_STATUS_SUCCESS);
305 }
306 
307 static int
308 bdev_daos_reset_retry_timer(void *arg)
309 {
310 	struct bdev_daos *daos = arg;
311 
312 	if (daos->reset_retry_timer) {
313 		spdk_poller_unregister(&daos->reset_retry_timer);
314 	}
315 
316 	spdk_for_each_channel(daos,
317 			      _bdev_daos_get_io_inflight,
318 			      daos,
319 			      _bdev_daos_get_io_inflight_done);
320 
321 	return SPDK_POLLER_BUSY;
322 }
323 
324 static void
325 bdev_daos_reset(struct bdev_daos *daos, struct bdev_daos_task *task)
326 {
327 	assert(daos != NULL);
328 	assert(task != NULL);
329 
330 	daos->reset_task = task;
331 	bdev_daos_reset_retry_timer(daos);
332 }
333 
334 
335 static int64_t
336 bdev_daos_unmap(struct bdev_daos_io_channel *ch, uint64_t nbytes,
337 		uint64_t offset)
338 {
339 	SPDK_DEBUGLOG(bdev_daos, "unmap at %#lx with size %#lx\n", offset, nbytes);
340 	return dfs_punch(ch->dfs, ch->obj, offset, nbytes);
341 }
342 
343 static void
344 _bdev_daos_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
345 {
346 	struct bdev_daos_io_channel *dch = spdk_io_channel_get_ctx(ch);
347 
348 	int64_t rc;
349 	switch (bdev_io->type) {
350 	case SPDK_BDEV_IO_TYPE_READ:
351 		spdk_bdev_io_get_buf(bdev_io, bdev_daos_get_buf_cb,
352 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
353 		break;
354 
355 	case SPDK_BDEV_IO_TYPE_WRITE:
356 		rc = bdev_daos_writev((struct bdev_daos *)bdev_io->bdev->ctxt,
357 				      dch,
358 				      (struct bdev_daos_task *)bdev_io->driver_ctx,
359 				      bdev_io->u.bdev.iovs,
360 				      bdev_io->u.bdev.iovcnt,
361 				      bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
362 				      bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
363 		if (rc < 0) {
364 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
365 			return;
366 		}
367 		break;
368 
369 	case SPDK_BDEV_IO_TYPE_RESET:
370 		/* Can't cancel in-flight requests, but can wait for their completions */
371 		bdev_daos_reset((struct bdev_daos *)bdev_io->bdev->ctxt,
372 				(struct bdev_daos_task *)bdev_io->driver_ctx);
373 		break;
374 
375 	case SPDK_BDEV_IO_TYPE_FLUSH:
376 		/* NOOP because DAOS requests land on PMEM and writes are persistent upon completion */
377 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
378 		break;
379 
380 	case SPDK_BDEV_IO_TYPE_UNMAP:
381 		rc = bdev_daos_unmap(dch,
382 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
383 				     bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
384 		if (!rc) {
385 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
386 		} else {
387 			SPDK_DEBUGLOG(bdev_daos, "%s: could not unmap: " DF_RC "\n",
388 				      dch->disk->disk.name, DP_RC((int)rc));
389 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
390 		}
391 
392 		break;
393 
394 	default:
395 		SPDK_ERRLOG("Wrong io type\n");
396 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
397 		break;
398 	}
399 }
400 
401 static void
402 bdev_daos_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
403 {
404 	struct bdev_daos_task *task = (struct bdev_daos_task *)bdev_io->driver_ctx;
405 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
406 
407 	assert(task != NULL);
408 
409 	task->submit_td = submit_td;
410 	task->bdev_io = bdev_io;
411 
412 	_bdev_daos_submit_request(ch, bdev_io);
413 }
414 
415 #define POLLING_EVENTS_NUM 64
416 
417 static int
418 bdev_daos_channel_poll(void *arg)
419 {
420 	daos_event_t *evp[POLLING_EVENTS_NUM];
421 	struct bdev_daos_io_channel *ch = arg;
422 
423 	assert(ch != NULL);
424 	assert(ch->disk != NULL);
425 
426 	int rc = daos_eq_poll(ch->queue, 0, DAOS_EQ_NOWAIT,
427 			      POLLING_EVENTS_NUM, evp);
428 
429 	if (rc < 0) {
430 		SPDK_DEBUGLOG(bdev_daos, "%s: could not poll daos event queue: " DF_RC "\n",
431 			      ch->disk->disk.name, DP_RC(rc));
432 		/*
433 		 * TODO: There are cases when this is self healing, e.g.
434 		 * brief network issues, DAOS agent restarting etc.
435 		 * However, if the issue persists over some time better would be
436 		 * to remove a bdev or the whole controller
437 		 */
438 		return SPDK_POLLER_BUSY;
439 	}
440 
441 	for (int i = 0; i < rc; ++i) {
442 		struct bdev_daos_task *task = container_of(evp[i], struct bdev_daos_task, ev);
443 		enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
444 
445 		assert(task != NULL);
446 
447 		if (task->ev.ev_error != DER_SUCCESS) {
448 			status = SPDK_BDEV_IO_STATUS_FAILED;
449 		}
450 
451 		daos_event_fini(&task->ev);
452 		bdev_daos_io_complete(task->bdev_io, status);
453 	}
454 
455 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
456 }
457 
458 static bool
459 bdev_daos_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
460 {
461 	switch (io_type) {
462 	case SPDK_BDEV_IO_TYPE_READ:
463 	case SPDK_BDEV_IO_TYPE_WRITE:
464 	case SPDK_BDEV_IO_TYPE_RESET:
465 	case SPDK_BDEV_IO_TYPE_FLUSH:
466 	case SPDK_BDEV_IO_TYPE_UNMAP:
467 		return true;
468 
469 	default:
470 		return false;
471 	}
472 }
473 
474 static struct spdk_io_channel *
475 bdev_daos_get_io_channel(void *ctx)
476 {
477 	return spdk_get_io_channel(ctx);
478 }
479 
480 static void
481 bdev_daos_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
482 {
483 	char uuid_str[SPDK_UUID_STRING_LEN];
484 	struct bdev_daos *daos = bdev->ctxt;
485 
486 	spdk_json_write_object_begin(w);
487 
488 	spdk_json_write_named_string(w, "method", "bdev_daos_create");
489 
490 	spdk_json_write_named_object_begin(w, "params");
491 	spdk_json_write_named_string(w, "name", bdev->name);
492 	spdk_json_write_named_string(w, "pool", daos->pool_name);
493 	spdk_json_write_named_string(w, "cont", daos->cont_name);
494 	spdk_json_write_named_uint64(w, "num_blocks", bdev->blockcnt);
495 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
496 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
497 	spdk_json_write_named_string(w, "uuid", uuid_str);
498 
499 	spdk_json_write_object_end(w);
500 
501 	spdk_json_write_object_end(w);
502 }
503 
504 static const struct spdk_bdev_fn_table daos_fn_table = {
505 	.destruct		= bdev_daos_destruct,
506 	.submit_request		= bdev_daos_submit_request,
507 	.io_type_supported	= bdev_daos_io_type_supported,
508 	.get_io_channel		= bdev_daos_get_io_channel,
509 	.write_config_json	= bdev_daos_write_json_config,
510 };
511 
512 static void *
513 _bdev_daos_io_channel_create_cb(void *ctx)
514 {
515 	int rc = 0 ;
516 	struct bdev_daos_io_channel *ch = ctx;
517 	struct bdev_daos *daos = ch->disk;
518 
519 	daos_pool_info_t pinfo;
520 	daos_cont_info_t cinfo;
521 
522 	int fd_oflag = O_CREAT | O_RDWR;
523 	mode_t mode = S_IFREG | S_IRWXU | S_IRWXG | S_IRWXO;
524 
525 	rc = bdev_get_daos_engine();
526 	if (rc) {
527 		SPDK_ERRLOG("could not initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
528 		return NULL;
529 	}
530 
531 	SPDK_DEBUGLOG(bdev_daos, "connecting to daos pool '%s'\n", daos->pool_name);
532 	if ((rc = daos_pool_connect(daos->pool_name, NULL, DAOS_PC_RW, &ch->pool, &pinfo, NULL))) {
533 		SPDK_ERRLOG("%s: could not connect to daos pool: " DF_RC "\n",
534 			    daos->disk.name, DP_RC(rc));
535 		return NULL;
536 	}
537 	SPDK_DEBUGLOG(bdev_daos, "connecting to daos container '%s'\n", daos->cont_name);
538 	if ((rc = daos_cont_open(ch->pool, daos->cont_name, DAOS_COO_RW, &ch->cont, &cinfo, NULL))) {
539 		SPDK_ERRLOG("%s: could not open daos container: " DF_RC "\n",
540 			    daos->disk.name, DP_RC(rc));
541 		goto cleanup_pool;
542 	}
543 	SPDK_DEBUGLOG(bdev_daos, "mounting daos dfs\n");
544 	if ((rc = dfs_mount(ch->pool, ch->cont, O_RDWR, &ch->dfs))) {
545 		SPDK_ERRLOG("%s: could not mount daos dfs: " DF_RC "\n",
546 			    daos->disk.name, DP_RC(rc));
547 		goto cleanup_cont;
548 	}
549 	SPDK_DEBUGLOG(bdev_daos, "opening dfs object\n");
550 	if ((rc = dfs_open(ch->dfs, NULL, daos->disk.name, mode, fd_oflag, daos->oclass,
551 			   0, NULL, &ch->obj))) {
552 		SPDK_ERRLOG("%s: could not open dfs object: " DF_RC "\n",
553 			    daos->disk.name, DP_RC(rc));
554 		goto cleanup_mount;
555 	}
556 	if ((rc = daos_eq_create(&ch->queue))) {
557 		SPDK_ERRLOG("%s: could not create daos event queue: " DF_RC "\n",
558 			    daos->disk.name, DP_RC(rc));
559 		goto cleanup_obj;
560 	}
561 
562 	return ctx;
563 
564 cleanup_obj:
565 	dfs_release(ch->obj);
566 cleanup_mount:
567 	dfs_umount(ch->dfs);
568 cleanup_cont:
569 	daos_cont_close(ch->cont, NULL);
570 cleanup_pool:
571 	daos_pool_disconnect(ch->pool, NULL);
572 
573 	return NULL;
574 }
575 
576 static int
577 bdev_daos_io_channel_create_cb(void *io_device, void *ctx_buf)
578 {
579 	struct bdev_daos_io_channel *ch = ctx_buf;
580 
581 	ch->disk = io_device;
582 
583 	if (spdk_call_unaffinitized(_bdev_daos_io_channel_create_cb, ch) == NULL) {
584 		return -EINVAL;
585 	}
586 
587 	SPDK_DEBUGLOG(bdev_daos, "%s: starting daos event queue poller\n",
588 		      ch->disk->disk.name);
589 
590 	ch->poller = SPDK_POLLER_REGISTER(bdev_daos_channel_poll, ch, 0);
591 
592 	return 0;
593 }
594 
595 static void
596 bdev_daos_io_channel_destroy_cb(void *io_device, void *ctx_buf)
597 {
598 	int rc;
599 	struct bdev_daos_io_channel *ch = ctx_buf;
600 
601 	SPDK_DEBUGLOG(bdev_daos, "stopping daos event queue poller\n");
602 
603 	spdk_poller_unregister(&ch->poller);
604 
605 	if ((rc = daos_eq_destroy(ch->queue, DAOS_EQ_DESTROY_FORCE))) {
606 		SPDK_ERRLOG("could not destroy daos event queue: " DF_RC "\n", DP_RC(rc));
607 	}
608 	if ((rc = dfs_release(ch->obj))) {
609 		SPDK_ERRLOG("could not release dfs object: " DF_RC "\n", DP_RC(rc));
610 	}
611 	if ((rc = dfs_umount(ch->dfs))) {
612 		SPDK_ERRLOG("could not unmount dfs: " DF_RC "\n", DP_RC(rc));
613 	}
614 	if ((rc = daos_cont_close(ch->cont, NULL))) {
615 		SPDK_ERRLOG("could not close container: " DF_RC "\n", DP_RC(rc));
616 	}
617 	if ((rc = daos_pool_disconnect(ch->pool, NULL))) {
618 		SPDK_ERRLOG("could not disconnect from pool: " DF_RC "\n", DP_RC(rc));
619 	}
620 	rc = bdev_daos_put_engine();
621 	if (rc) {
622 		SPDK_ERRLOG("could not de-initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
623 	}
624 }
625 
626 int
627 create_bdev_daos(struct spdk_bdev **bdev,
628 		 const char *name, const struct spdk_uuid *uuid,
629 		 const char *pool, const char *cont, const char *oclass,
630 		 uint64_t num_blocks, uint32_t block_size)
631 {
632 	int rc;
633 	size_t len;
634 	struct bdev_daos *daos;
635 	struct bdev_daos_io_channel ch = {};
636 
637 	SPDK_NOTICELOG("%s: creating bdev_daos disk on '%s:%s'\n", name, pool, cont);
638 
639 	if (num_blocks == 0) {
640 		SPDK_ERRLOG("Disk num_blocks must be greater than 0");
641 		return -EINVAL;
642 	}
643 
644 	if (block_size % 512) {
645 		SPDK_ERRLOG("block size must be 512 bytes aligned\n");
646 		return -EINVAL;
647 	}
648 
649 	if (!name) {
650 		SPDK_ERRLOG("device name cannot be empty\n");
651 		return -EINVAL;
652 	}
653 
654 	if (!pool) {
655 		SPDK_ERRLOG("daos pool cannot be empty\n");
656 		return -EINVAL;
657 	}
658 	if (!cont) {
659 		SPDK_ERRLOG("daos cont cannot be empty\n");
660 		return -EINVAL;
661 	}
662 
663 	daos = calloc(1, sizeof(*daos));
664 	if (!daos) {
665 		SPDK_ERRLOG("calloc() failed\n");
666 		return -ENOMEM;
667 	}
668 
669 	if (!oclass) {
670 		oclass = "SX"; /* Max throughput by default */
671 	}
672 	daos->oclass = daos_oclass_name2id(oclass);
673 	if (daos->oclass == OC_UNKNOWN) {
674 		SPDK_ERRLOG("could not parse daos oclass: '%s'\n", oclass);
675 		free(daos);
676 		return -EINVAL;
677 	}
678 
679 	len = strlen(pool);
680 	if (len > DAOS_PROP_LABEL_MAX_LEN) {
681 		SPDK_ERRLOG("daos pool name is too long\n");
682 		free(daos);
683 		return -EINVAL;
684 	}
685 	memcpy(daos->pool_name, pool, len);
686 
687 	len = strlen(cont);
688 	if (len > DAOS_PROP_LABEL_MAX_LEN) {
689 		SPDK_ERRLOG("daos cont name is too long\n");
690 		free(daos);
691 		return -EINVAL;
692 	}
693 	memcpy(daos->cont_name, cont, len);
694 
695 	daos->disk.name = strdup(name);
696 	daos->disk.product_name = "DAOS bdev";
697 
698 	daos->disk.write_cache = 0;
699 	daos->disk.blocklen = block_size;
700 	daos->disk.blockcnt = num_blocks;
701 
702 	if (uuid) {
703 		daos->disk.uuid = *uuid;
704 	} else {
705 		spdk_uuid_generate(&daos->disk.uuid);
706 	}
707 
708 	daos->disk.ctxt = daos;
709 	daos->disk.fn_table = &daos_fn_table;
710 	daos->disk.module = &daos_if;
711 
712 	rc = bdev_get_daos_engine();
713 	if (rc) {
714 		SPDK_ERRLOG("could not initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
715 		bdev_daos_free(daos);
716 		return rc;
717 	}
718 
719 	/* We try to connect to the DAOS container during channel creation, so simulate
720 	 * creating a channel here, so that we can return a failure when the DAOS bdev
721 	 * is created, instead of finding it out later when the first channel is created
722 	 * and leaving unusable bdev registered.
723 	 */
724 	rc = bdev_daos_io_channel_create_cb(daos, &ch);
725 	if (rc) {
726 		SPDK_ERRLOG("'%s' could not initialize io-channel: %s", name, strerror(-rc));
727 		bdev_daos_free(daos);
728 		return rc;
729 	}
730 	bdev_daos_io_channel_destroy_cb(daos, &ch);
731 
732 	spdk_io_device_register(daos, bdev_daos_io_channel_create_cb,
733 				bdev_daos_io_channel_destroy_cb,
734 				sizeof(struct bdev_daos_io_channel),
735 				daos->disk.name);
736 
737 
738 	rc = spdk_bdev_register(&daos->disk);
739 	if (rc) {
740 		spdk_io_device_unregister(daos, NULL);
741 		bdev_daos_free(daos);
742 		return rc;
743 	}
744 
745 	*bdev = &(daos->disk);
746 
747 	return rc;
748 }
749 
750 static void
751 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
752 {
753 }
754 
755 int
756 bdev_daos_resize(const char *name, const uint64_t new_size_in_mb)
757 {
758 	int rc = 0;
759 	struct spdk_bdev_desc *desc;
760 	struct spdk_bdev *bdev;
761 	struct spdk_io_channel *ch;
762 	struct bdev_daos_io_channel *dch;
763 	uint64_t new_size_in_byte;
764 	uint64_t current_size_in_mb;
765 
766 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
767 	if (rc != 0) {
768 		return rc;
769 	}
770 
771 	bdev = spdk_bdev_desc_get_bdev(desc);
772 	if (bdev->module != &daos_if) {
773 		rc = -EINVAL;
774 		goto exit;
775 	}
776 
777 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
778 	if (current_size_in_mb > new_size_in_mb) {
779 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
780 		rc = -EINVAL;
781 		goto exit;
782 	}
783 
784 	ch = bdev_daos_get_io_channel(bdev);
785 	dch = spdk_io_channel_get_ctx(ch);
786 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
787 
788 	rc = dfs_punch(dch->dfs, dch->obj, new_size_in_byte, DFS_MAX_FSIZE);
789 	spdk_put_io_channel(ch);
790 	if (rc != 0) {
791 		SPDK_ERRLOG("failed to resize daos bdev: " DF_RC "\n", DP_RC(rc));
792 		rc = -EINTR;
793 		goto exit;
794 	}
795 
796 	SPDK_NOTICELOG("DAOS bdev device is resized: bdev name %s, old block count %" PRIu64
797 		       ", new block count %"
798 		       PRIu64 "\n",
799 		       bdev->name,
800 		       bdev->blockcnt,
801 		       new_size_in_byte / bdev->blocklen);
802 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
803 	if (rc != 0) {
804 		SPDK_ERRLOG("failed to notify block cnt change.\n");
805 	}
806 
807 exit:
808 	spdk_bdev_close(desc);
809 	return rc;
810 }
811 
812 void
813 delete_bdev_daos(struct spdk_bdev *bdev, spdk_delete_daos_complete cb_fn, void *cb_arg)
814 {
815 	if (!bdev || bdev->module != &daos_if) {
816 		cb_fn(cb_arg, -ENODEV);
817 		return;
818 	}
819 
820 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
821 }
822 
823 static int
824 bdev_get_daos_engine(void)
825 {
826 	int rc = 0;
827 
828 	pthread_mutex_lock(&g_bdev_daos_init_mutex);
829 	if (g_bdev_daos_init_count++ > 0) {
830 		pthread_mutex_unlock(&g_bdev_daos_init_mutex);
831 		return 0;
832 	}
833 	SPDK_DEBUGLOG(bdev_daos, "initializing DAOS engine\n");
834 
835 	rc = daos_init();
836 	pthread_mutex_unlock(&g_bdev_daos_init_mutex);
837 
838 	if (rc != -DER_ALREADY && rc) {
839 		return rc;
840 	}
841 	return 0;
842 }
843 
844 static int
845 bdev_daos_put_engine(void)
846 {
847 	int rc = 0;
848 
849 	pthread_mutex_lock(&g_bdev_daos_init_mutex);
850 	if (--g_bdev_daos_init_count > 0) {
851 		pthread_mutex_unlock(&g_bdev_daos_init_mutex);
852 		return 0;
853 	}
854 	SPDK_DEBUGLOG(bdev_daos, "de-initializing DAOS engine\n");
855 
856 	rc = daos_fini();
857 	pthread_mutex_unlock(&g_bdev_daos_init_mutex);
858 
859 	return rc;
860 }
861 
862 static int
863 bdev_daos_initialize(void)
864 {
865 	/* DAOS engine and client initialization happens
866 	   during the first bdev creation */
867 	return 0;
868 }
869 
870 SPDK_LOG_REGISTER_COMPONENT(bdev_daos)
871