xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 0098e636761237b77c12c30c2408263a5d2260cc)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd {
27 	struct spdk_bdev disk;
28 	char *rbd_name;
29 	char *user_id;
30 	char *pool_name;
31 	char **config;
32 
33 	rados_t cluster;
34 	rados_t *cluster_p;
35 	char *cluster_name;
36 
37 	rados_ioctx_t io_ctx;
38 	rbd_image_t image;
39 
40 	rbd_image_info_t info;
41 	pthread_mutex_t mutex;
42 	struct spdk_thread *main_td;
43 	struct spdk_thread *destruct_td;
44 	uint32_t ch_count;
45 	struct spdk_io_channel *group_ch;
46 
47 	TAILQ_ENTRY(bdev_rbd) tailq;
48 	struct spdk_poller *reset_timer;
49 	struct spdk_bdev_io *reset_bdev_io;
50 };
51 
52 struct bdev_rbd_io_channel {
53 	struct bdev_rbd *disk;
54 };
55 
56 struct bdev_rbd_io {
57 	struct			spdk_thread *submit_td;
58 	enum			spdk_bdev_io_status status;
59 	rbd_completion_t	comp;
60 	size_t			total_len;
61 };
62 
63 struct bdev_rbd_cluster {
64 	char *name;
65 	char *user_id;
66 	char **config_param;
67 	char *config_file;
68 	char *key_file;
69 	rados_t cluster;
70 	uint32_t ref;
71 	STAILQ_ENTRY(bdev_rbd_cluster) link;
72 };
73 
74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
75 			g_map_bdev_rbd_cluster);
76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
77 
78 static void
79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
80 {
81 	assert(entry != NULL);
82 
83 	bdev_rbd_free_config(entry->config_param);
84 	free(entry->config_file);
85 	free(entry->key_file);
86 	free(entry->user_id);
87 	free(entry->name);
88 	free(entry);
89 }
90 
91 static void
92 bdev_rbd_put_cluster(rados_t **cluster)
93 {
94 	struct bdev_rbd_cluster *entry;
95 
96 	assert(cluster != NULL);
97 
98 	/* No need go through the map if *cluster equals to NULL */
99 	if (*cluster == NULL) {
100 		return;
101 	}
102 
103 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
104 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
105 		if (*cluster != &entry->cluster) {
106 			continue;
107 		}
108 
109 		assert(entry->ref > 0);
110 		entry->ref--;
111 		*cluster = NULL;
112 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
113 		return;
114 	}
115 
116 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
117 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
118 }
119 
120 static void
121 bdev_rbd_free(struct bdev_rbd *rbd)
122 {
123 	if (!rbd) {
124 		return;
125 	}
126 
127 	free(rbd->disk.name);
128 	free(rbd->rbd_name);
129 	free(rbd->user_id);
130 	free(rbd->pool_name);
131 	bdev_rbd_free_config(rbd->config);
132 
133 	if (rbd->io_ctx) {
134 		rados_ioctx_destroy(rbd->io_ctx);
135 	}
136 
137 	if (rbd->cluster_name) {
138 		bdev_rbd_put_cluster(&rbd->cluster_p);
139 		free(rbd->cluster_name);
140 	} else if (rbd->cluster) {
141 		rados_shutdown(rbd->cluster);
142 	}
143 
144 	pthread_mutex_destroy(&rbd->mutex);
145 	free(rbd);
146 }
147 
148 void
149 bdev_rbd_free_config(char **config)
150 {
151 	char **entry;
152 
153 	if (config) {
154 		for (entry = config; *entry; entry++) {
155 			free(*entry);
156 		}
157 		free(config);
158 	}
159 }
160 
161 char **
162 bdev_rbd_dup_config(const char *const *config)
163 {
164 	size_t count;
165 	char **copy;
166 
167 	if (!config) {
168 		return NULL;
169 	}
170 	for (count = 0; config[count]; count++) {}
171 	copy = calloc(count + 1, sizeof(*copy));
172 	if (!copy) {
173 		return NULL;
174 	}
175 	for (count = 0; config[count]; count++) {
176 		if (!(copy[count] = strdup(config[count]))) {
177 			bdev_rbd_free_config(copy);
178 			return NULL;
179 		}
180 	}
181 	return copy;
182 }
183 
184 static int
185 bdev_rados_cluster_init(const char *user_id, const char *const *config,
186 			rados_t *cluster)
187 {
188 	int ret;
189 
190 	ret = rados_create(cluster, user_id);
191 	if (ret < 0) {
192 		SPDK_ERRLOG("Failed to create rados_t struct\n");
193 		return -1;
194 	}
195 
196 	if (config) {
197 		const char *const *entry = config;
198 		while (*entry) {
199 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
200 			if (ret < 0) {
201 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
202 				rados_shutdown(*cluster);
203 				return -1;
204 			}
205 			entry += 2;
206 		}
207 	} else {
208 		ret = rados_conf_read_file(*cluster, NULL);
209 		if (ret < 0) {
210 			SPDK_ERRLOG("Failed to read conf file\n");
211 			rados_shutdown(*cluster);
212 			return -1;
213 		}
214 	}
215 
216 	ret = rados_connect(*cluster);
217 	if (ret < 0) {
218 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
219 		rados_shutdown(*cluster);
220 		return -1;
221 	}
222 
223 	return 0;
224 }
225 
226 static int
227 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
228 {
229 	struct bdev_rbd_cluster *entry;
230 
231 	if (cluster == NULL) {
232 		SPDK_ERRLOG("cluster should not be NULL\n");
233 		return -1;
234 	}
235 
236 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
237 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
238 		if (strcmp(cluster_name, entry->name) == 0) {
239 			entry->ref++;
240 			*cluster = &entry->cluster;
241 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
242 			return 0;
243 		}
244 	}
245 
246 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
247 	return -1;
248 }
249 
250 static int
251 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
252 {
253 	int ret;
254 
255 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
256 	if (ret < 0) {
257 		SPDK_ERRLOG("Failed to create rados_t struct\n");
258 		return -1;
259 	}
260 
261 	return ret;
262 }
263 
264 static void *
265 bdev_rbd_cluster_handle(void *arg)
266 {
267 	void *ret = arg;
268 	struct bdev_rbd *rbd = arg;
269 	int rc;
270 
271 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
272 				     &rbd->cluster);
273 	if (rc < 0) {
274 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
275 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
276 		ret = NULL;
277 	}
278 
279 	return ret;
280 }
281 
282 static void *
283 bdev_rbd_init_context(void *arg)
284 {
285 	struct bdev_rbd *rbd = arg;
286 	int rc;
287 
288 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
289 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
290 		return NULL;
291 	}
292 
293 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
294 	if (rc < 0) {
295 		SPDK_ERRLOG("Failed to open specified rbd device\n");
296 		return NULL;
297 	}
298 
299 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
300 	rbd_close(rbd->image);
301 	if (rc < 0) {
302 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
303 		return NULL;
304 	}
305 
306 	return arg;
307 }
308 
309 static int
310 bdev_rbd_init(struct bdev_rbd *rbd)
311 {
312 	int ret = 0;
313 
314 	if (!rbd->cluster_name) {
315 		rbd->cluster_p = &rbd->cluster;
316 		/* Cluster should be created in non-SPDK thread to avoid conflict between
317 		 * Rados and SPDK thread */
318 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
319 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
320 			return -1;
321 		}
322 	} else {
323 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
324 		if (ret < 0) {
325 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
326 				    rbd, rbd->cluster_name);
327 			return -1;
328 		}
329 	}
330 
331 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
332 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
333 	}
334 
335 	return ret;
336 }
337 
338 static void
339 bdev_rbd_exit(rbd_image_t image)
340 {
341 	rbd_flush(image);
342 	rbd_close(image);
343 }
344 
345 static void
346 _bdev_rbd_io_complete(void *_rbd_io)
347 {
348 	struct bdev_rbd_io *rbd_io = _rbd_io;
349 
350 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
351 }
352 
353 static void
354 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
355 {
356 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
357 	struct spdk_thread *current_thread = spdk_get_thread();
358 
359 	rbd_io->status = status;
360 	assert(rbd_io->submit_td != NULL);
361 	if (rbd_io->submit_td != current_thread) {
362 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
363 	} else {
364 		_bdev_rbd_io_complete(rbd_io);
365 	}
366 }
367 
368 static void
369 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
370 {
371 	int io_status;
372 	struct spdk_bdev_io *bdev_io;
373 	struct bdev_rbd_io *rbd_io;
374 	enum spdk_bdev_io_status bio_status;
375 
376 	bdev_io = rbd_aio_get_arg(cb);
377 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
378 	io_status = rbd_aio_get_return_value(cb);
379 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
380 
381 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
382 		if ((int)rbd_io->total_len != io_status) {
383 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
384 		}
385 	} else {
386 		/* For others, 0 means success */
387 		if (io_status != 0) {
388 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
389 		}
390 	}
391 
392 	rbd_aio_release(cb);
393 
394 	bdev_rbd_io_complete(bdev_io, bio_status);
395 }
396 
397 static void
398 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
399 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
400 {
401 	int ret;
402 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
403 	rbd_image_t image = disk->image;
404 
405 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
406 					&rbd_io->comp);
407 	if (ret < 0) {
408 		goto err;
409 	}
410 
411 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
412 		rbd_io->total_len = len;
413 		if (spdk_likely(iovcnt == 1)) {
414 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
415 		} else {
416 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
417 		}
418 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
419 		if (spdk_likely(iovcnt == 1)) {
420 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
421 		} else {
422 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
423 		}
424 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
425 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
426 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
427 		ret = rbd_aio_flush(image, rbd_io->comp);
428 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
429 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
430 	}
431 
432 	if (ret < 0) {
433 		rbd_aio_release(rbd_io->comp);
434 		goto err;
435 	}
436 
437 	return;
438 
439 err:
440 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
441 }
442 
443 static int bdev_rbd_library_init(void);
444 static void bdev_rbd_library_fini(void);
445 
446 static int
447 bdev_rbd_get_ctx_size(void)
448 {
449 	return sizeof(struct bdev_rbd_io);
450 }
451 
452 static struct spdk_bdev_module rbd_if = {
453 	.name = "rbd",
454 	.module_init = bdev_rbd_library_init,
455 	.module_fini = bdev_rbd_library_fini,
456 	.get_ctx_size = bdev_rbd_get_ctx_size,
457 
458 };
459 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
460 
461 static int
462 bdev_rbd_reset_timer(void *arg)
463 {
464 	struct bdev_rbd *disk = arg;
465 
466 	/*
467 	 * TODO: This should check if any I/O is still in flight before completing the reset.
468 	 * For now, just complete after the timer expires.
469 	 */
470 	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
471 	spdk_poller_unregister(&disk->reset_timer);
472 	disk->reset_bdev_io = NULL;
473 
474 	return SPDK_POLLER_BUSY;
475 }
476 
477 static void
478 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
479 {
480 	/*
481 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
482 	 * timer to wait for in-flight I/O to complete.
483 	 */
484 	assert(disk->reset_bdev_io == NULL);
485 	disk->reset_bdev_io = bdev_io;
486 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
487 }
488 
489 static void
490 _bdev_rbd_destruct_done(void *io_device)
491 {
492 	struct bdev_rbd *rbd = io_device;
493 
494 	assert(rbd != NULL);
495 	assert(rbd->ch_count == 0);
496 
497 	spdk_bdev_destruct_done(&rbd->disk, 0);
498 	bdev_rbd_free(rbd);
499 }
500 
501 static void
502 bdev_rbd_free_cb(void *io_device)
503 {
504 	struct bdev_rbd *rbd = io_device;
505 
506 	/* The io device has been unregistered.  Send a message back to the
507 	 * original thread that started the destruct operation, so that the
508 	 * bdev unregister callback is invoked on the same thread that started
509 	 * this whole process.
510 	 */
511 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
512 }
513 
514 static void
515 _bdev_rbd_destruct(void *ctx)
516 {
517 	struct bdev_rbd *rbd = ctx;
518 
519 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
520 }
521 
522 static int
523 bdev_rbd_destruct(void *ctx)
524 {
525 	struct bdev_rbd *rbd = ctx;
526 	struct spdk_thread *td;
527 
528 	if (rbd->main_td == NULL) {
529 		td = spdk_get_thread();
530 	} else {
531 		td = rbd->main_td;
532 	}
533 
534 	/* Start the destruct operation on the rbd bdev's
535 	 * main thread.  This guarantees it will only start
536 	 * executing after any messages related to channel
537 	 * deletions have finished completing.  *Always*
538 	 * send a message, even if this function gets called
539 	 * from the main thread, in case there are pending
540 	 * channel delete messages in flight to this thread.
541 	 */
542 	assert(rbd->destruct_td == NULL);
543 	rbd->destruct_td = td;
544 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
545 
546 	/* Return 1 to indicate the destruct path is asynchronous. */
547 	return 1;
548 }
549 
550 static void
551 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
552 		    bool success)
553 {
554 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
555 
556 	if (!success) {
557 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
558 		return;
559 	}
560 
561 	bdev_rbd_start_aio(disk,
562 			   bdev_io,
563 			   bdev_io->u.bdev.iovs,
564 			   bdev_io->u.bdev.iovcnt,
565 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
566 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
567 }
568 
569 static void
570 _bdev_rbd_submit_request(void *ctx)
571 {
572 	struct spdk_bdev_io *bdev_io = ctx;
573 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
574 
575 	switch (bdev_io->type) {
576 	case SPDK_BDEV_IO_TYPE_READ:
577 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
578 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
579 		break;
580 
581 	case SPDK_BDEV_IO_TYPE_WRITE:
582 	case SPDK_BDEV_IO_TYPE_UNMAP:
583 	case SPDK_BDEV_IO_TYPE_FLUSH:
584 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
585 		bdev_rbd_start_aio(disk,
586 				   bdev_io,
587 				   bdev_io->u.bdev.iovs,
588 				   bdev_io->u.bdev.iovcnt,
589 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
590 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
591 		break;
592 
593 	case SPDK_BDEV_IO_TYPE_RESET:
594 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
595 			       bdev_io);
596 		break;
597 
598 	default:
599 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
600 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
601 		break;
602 	}
603 }
604 
605 static void
606 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
607 {
608 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
609 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
610 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
611 
612 	rbd_io->submit_td = submit_td;
613 	if (disk->main_td != submit_td) {
614 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
615 	} else {
616 		_bdev_rbd_submit_request(bdev_io);
617 	}
618 }
619 
620 static bool
621 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
622 {
623 	switch (io_type) {
624 	case SPDK_BDEV_IO_TYPE_READ:
625 	case SPDK_BDEV_IO_TYPE_WRITE:
626 	case SPDK_BDEV_IO_TYPE_UNMAP:
627 	case SPDK_BDEV_IO_TYPE_FLUSH:
628 	case SPDK_BDEV_IO_TYPE_RESET:
629 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
630 		return true;
631 
632 	default:
633 		return false;
634 	}
635 }
636 
637 static void
638 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
639 {
640 	assert(disk != NULL);
641 	assert(disk->main_td == spdk_get_thread());
642 	assert(disk->ch_count == 0);
643 
644 	spdk_put_io_channel(disk->group_ch);
645 	if (disk->image) {
646 		bdev_rbd_exit(disk->image);
647 	}
648 
649 	disk->main_td = NULL;
650 	disk->group_ch = NULL;
651 }
652 
653 static void *
654 bdev_rbd_handle(void *arg)
655 {
656 	struct bdev_rbd *disk = arg;
657 	void *ret = arg;
658 
659 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
660 		SPDK_ERRLOG("Failed to open specified rbd device\n");
661 		ret = NULL;
662 	}
663 
664 	return ret;
665 }
666 
667 static int
668 _bdev_rbd_create_cb(struct bdev_rbd *disk)
669 {
670 	disk->group_ch = spdk_get_io_channel(&rbd_if);
671 	assert(disk->group_ch != NULL);
672 
673 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
674 		bdev_rbd_free_channel_resources(disk);
675 		return -1;
676 	}
677 
678 	return 0;
679 }
680 
681 static int
682 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
683 {
684 	struct bdev_rbd_io_channel *ch = ctx_buf;
685 	struct bdev_rbd *disk = io_device;
686 	int rc;
687 
688 	ch->disk = disk;
689 	pthread_mutex_lock(&disk->mutex);
690 	if (disk->ch_count == 0) {
691 		assert(disk->main_td == NULL);
692 		rc = _bdev_rbd_create_cb(disk);
693 		if (rc) {
694 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
695 			pthread_mutex_unlock(&disk->mutex);
696 			return rc;
697 		}
698 
699 		disk->main_td = spdk_get_thread();
700 	}
701 
702 	disk->ch_count++;
703 	pthread_mutex_unlock(&disk->mutex);
704 
705 	return 0;
706 }
707 
708 static void
709 _bdev_rbd_destroy_cb(void *ctx)
710 {
711 	struct bdev_rbd *disk = ctx;
712 
713 	pthread_mutex_lock(&disk->mutex);
714 	assert(disk->ch_count > 0);
715 	disk->ch_count--;
716 
717 	if (disk->ch_count > 0) {
718 		/* A new channel was created between when message was sent and this function executed */
719 		pthread_mutex_unlock(&disk->mutex);
720 		return;
721 	}
722 
723 	bdev_rbd_free_channel_resources(disk);
724 	pthread_mutex_unlock(&disk->mutex);
725 }
726 
727 static void
728 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
729 {
730 	struct bdev_rbd *disk = io_device;
731 	struct spdk_thread *thread;
732 
733 	pthread_mutex_lock(&disk->mutex);
734 	assert(disk->ch_count > 0);
735 	disk->ch_count--;
736 	if (disk->ch_count == 0) {
737 		assert(disk->main_td != NULL);
738 		if (disk->main_td != spdk_get_thread()) {
739 			/* The final channel was destroyed on a different thread
740 			 * than where the first channel was created. Pass a message
741 			 * to the main thread to unregister the poller. */
742 			disk->ch_count++;
743 			thread = disk->main_td;
744 			pthread_mutex_unlock(&disk->mutex);
745 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
746 			return;
747 		}
748 
749 		bdev_rbd_free_channel_resources(disk);
750 	}
751 	pthread_mutex_unlock(&disk->mutex);
752 }
753 
754 static struct spdk_io_channel *
755 bdev_rbd_get_io_channel(void *ctx)
756 {
757 	struct bdev_rbd *rbd_bdev = ctx;
758 
759 	return spdk_get_io_channel(rbd_bdev);
760 }
761 
762 static void
763 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
764 {
765 	struct bdev_rbd_cluster *entry;
766 
767 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
768 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
769 		if (strcmp(cluster_name, entry->name)) {
770 			continue;
771 		}
772 		if (entry->user_id) {
773 			spdk_json_write_named_string(w, "user_id", entry->user_id);
774 		}
775 
776 		if (entry->config_param) {
777 			char **config_entry = entry->config_param;
778 
779 			spdk_json_write_named_object_begin(w, "config_param");
780 			while (*config_entry) {
781 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
782 				config_entry += 2;
783 			}
784 			spdk_json_write_object_end(w);
785 		}
786 		if (entry->config_file) {
787 			spdk_json_write_named_string(w, "config_file", entry->config_file);
788 		}
789 		if (entry->key_file) {
790 			spdk_json_write_named_string(w, "key_file", entry->key_file);
791 		}
792 
793 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
794 		return;
795 	}
796 
797 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
798 }
799 
800 static int
801 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
802 {
803 	struct bdev_rbd *rbd_bdev = ctx;
804 
805 	spdk_json_write_named_object_begin(w, "rbd");
806 
807 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
808 
809 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
810 
811 	if (rbd_bdev->cluster_name) {
812 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
813 		goto end;
814 	}
815 
816 	if (rbd_bdev->user_id) {
817 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
818 	}
819 
820 	if (rbd_bdev->config) {
821 		char **entry = rbd_bdev->config;
822 
823 		spdk_json_write_named_object_begin(w, "config");
824 		while (*entry) {
825 			spdk_json_write_named_string(w, entry[0], entry[1]);
826 			entry += 2;
827 		}
828 		spdk_json_write_object_end(w);
829 	}
830 
831 end:
832 	spdk_json_write_object_end(w);
833 
834 	return 0;
835 }
836 
837 static void
838 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
839 {
840 	struct bdev_rbd *rbd = bdev->ctxt;
841 	char uuid_str[SPDK_UUID_STRING_LEN];
842 
843 	spdk_json_write_object_begin(w);
844 
845 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
846 
847 	spdk_json_write_named_object_begin(w, "params");
848 	spdk_json_write_named_string(w, "name", bdev->name);
849 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
850 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
851 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
852 	if (rbd->user_id) {
853 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
854 	}
855 
856 	if (rbd->config) {
857 		char **entry = rbd->config;
858 
859 		spdk_json_write_named_object_begin(w, "config");
860 		while (*entry) {
861 			spdk_json_write_named_string(w, entry[0], entry[1]);
862 			entry += 2;
863 		}
864 		spdk_json_write_object_end(w);
865 	}
866 
867 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
868 	spdk_json_write_named_string(w, "uuid", uuid_str);
869 
870 	spdk_json_write_object_end(w);
871 
872 	spdk_json_write_object_end(w);
873 }
874 
875 static void
876 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
877 {
878 	assert(entry != NULL);
879 
880 	spdk_json_write_object_begin(w);
881 	spdk_json_write_named_string(w, "cluster_name", entry->name);
882 
883 	if (entry->user_id) {
884 		spdk_json_write_named_string(w, "user_id", entry->user_id);
885 	}
886 
887 	if (entry->config_param) {
888 		char **config_entry = entry->config_param;
889 
890 		spdk_json_write_named_object_begin(w, "config_param");
891 		while (*config_entry) {
892 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
893 			config_entry += 2;
894 		}
895 		spdk_json_write_object_end(w);
896 	}
897 	if (entry->config_file) {
898 		spdk_json_write_named_string(w, "config_file", entry->config_file);
899 	}
900 	if (entry->key_file) {
901 		spdk_json_write_named_string(w, "key_file", entry->key_file);
902 	}
903 
904 	spdk_json_write_object_end(w);
905 }
906 
907 int
908 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
909 {
910 	struct bdev_rbd_cluster *entry;
911 	struct spdk_json_write_ctx *w;
912 
913 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
914 
915 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
916 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
917 		return -ENOENT;
918 	}
919 
920 	/* If cluster name is provided */
921 	if (name) {
922 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
923 			if (strcmp(name, entry->name) == 0) {
924 				w = spdk_jsonrpc_begin_result(request);
925 				dump_single_cluster_entry(entry, w);
926 				spdk_jsonrpc_end_result(request, w);
927 
928 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
929 				return 0;
930 			}
931 		}
932 
933 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
934 		return -ENOENT;
935 	}
936 
937 	w = spdk_jsonrpc_begin_result(request);
938 	spdk_json_write_array_begin(w);
939 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
940 		dump_single_cluster_entry(entry, w);
941 	}
942 	spdk_json_write_array_end(w);
943 	spdk_jsonrpc_end_result(request, w);
944 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
945 
946 	return 0;
947 }
948 
949 static const struct spdk_bdev_fn_table rbd_fn_table = {
950 	.destruct		= bdev_rbd_destruct,
951 	.submit_request		= bdev_rbd_submit_request,
952 	.io_type_supported	= bdev_rbd_io_type_supported,
953 	.get_io_channel		= bdev_rbd_get_io_channel,
954 	.dump_info_json		= bdev_rbd_dump_info_json,
955 	.write_config_json	= bdev_rbd_write_config_json,
956 };
957 
958 static int
959 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
960 		     const char *config_file, const char *key_file)
961 {
962 	struct bdev_rbd_cluster *entry;
963 	int rc;
964 
965 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
966 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
967 		if (strcmp(name, entry->name) == 0) {
968 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
969 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
970 			return -1;
971 		}
972 	}
973 
974 	entry = calloc(1, sizeof(*entry));
975 	if (!entry) {
976 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
977 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
978 		return -1;
979 	}
980 
981 	entry->name = strdup(name);
982 	if (entry->name == NULL) {
983 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
984 		goto err_handle;
985 	}
986 
987 	if (user_id) {
988 		entry->user_id = strdup(user_id);
989 		if (entry->user_id == NULL) {
990 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
991 			goto err_handle;
992 		}
993 	}
994 
995 	/* Support specify config_param or config_file separately, or both of them. */
996 	if (config_param) {
997 		entry->config_param = bdev_rbd_dup_config(config_param);
998 		if (entry->config_param == NULL) {
999 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1000 			goto err_handle;
1001 		}
1002 	}
1003 
1004 	if (config_file) {
1005 		entry->config_file = strdup(config_file);
1006 		if (entry->config_file == NULL) {
1007 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1008 			goto err_handle;
1009 		}
1010 	}
1011 
1012 	if (key_file) {
1013 		entry->key_file = strdup(key_file);
1014 		if (entry->key_file == NULL) {
1015 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1016 			goto err_handle;
1017 		}
1018 	}
1019 
1020 	rc = rados_create(&entry->cluster, user_id);
1021 	if (rc < 0) {
1022 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1023 		goto err_handle;
1024 	}
1025 
1026 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1027 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1028 	if (entry->config_file && rc < 0) {
1029 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1030 		rados_shutdown(entry->cluster);
1031 		goto err_handle;
1032 	}
1033 
1034 	if (config_param) {
1035 		const char *const *config_entry = config_param;
1036 		while (*config_entry) {
1037 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1038 			if (rc < 0) {
1039 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1040 				rados_shutdown(entry->cluster);
1041 				goto err_handle;
1042 			}
1043 			config_entry += 2;
1044 		}
1045 	}
1046 
1047 	if (key_file) {
1048 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1049 		if (rc < 0) {
1050 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1051 			rados_shutdown(entry->cluster);
1052 			goto err_handle;
1053 		}
1054 	}
1055 
1056 	rc = rados_connect(entry->cluster);
1057 	if (rc < 0) {
1058 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1059 		rados_shutdown(entry->cluster);
1060 		goto err_handle;
1061 	}
1062 
1063 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1064 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1065 
1066 	return 0;
1067 
1068 err_handle:
1069 	bdev_rbd_cluster_free(entry);
1070 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1071 	return -1;
1072 }
1073 
1074 int
1075 bdev_rbd_unregister_cluster(const char *name)
1076 {
1077 	struct bdev_rbd_cluster *entry;
1078 	int rc = 0;
1079 
1080 	if (name == NULL) {
1081 		return -1;
1082 	}
1083 
1084 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1085 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1086 		if (strcmp(name, entry->name) == 0) {
1087 			if (entry->ref == 0) {
1088 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1089 				rados_shutdown(entry->cluster);
1090 				bdev_rbd_cluster_free(entry);
1091 			} else {
1092 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1093 					    entry->name);
1094 				rc = -1;
1095 			}
1096 
1097 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1098 			return rc;
1099 		}
1100 	}
1101 
1102 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1103 
1104 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1105 
1106 	return -1;
1107 }
1108 
1109 static void *
1110 _bdev_rbd_register_cluster(void *arg)
1111 {
1112 	struct cluster_register_info *info = arg;
1113 	void *ret = arg;
1114 	int rc;
1115 
1116 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1117 				  (const char *const *)info->config_param, (const char *)info->config_file,
1118 				  (const char *)info->key_file);
1119 	if (rc) {
1120 		ret = NULL;
1121 	}
1122 
1123 	return ret;
1124 }
1125 
1126 int
1127 bdev_rbd_register_cluster(struct cluster_register_info *info)
1128 {
1129 	assert(info != NULL);
1130 
1131 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1132 	 * resource contention */
1133 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1134 		return -1;
1135 	}
1136 
1137 	return 0;
1138 }
1139 
1140 int
1141 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1142 		const char *pool_name,
1143 		const char *const *config,
1144 		const char *rbd_name,
1145 		uint32_t block_size,
1146 		const char *cluster_name,
1147 		const struct spdk_uuid *uuid)
1148 {
1149 	struct bdev_rbd *rbd;
1150 	int ret;
1151 
1152 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1153 		return -EINVAL;
1154 	}
1155 
1156 	rbd = calloc(1, sizeof(struct bdev_rbd));
1157 	if (rbd == NULL) {
1158 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1159 		return -ENOMEM;
1160 	}
1161 
1162 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1163 	if (ret) {
1164 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1165 		free(rbd);
1166 		return ret;
1167 	}
1168 
1169 	rbd->rbd_name = strdup(rbd_name);
1170 	if (!rbd->rbd_name) {
1171 		bdev_rbd_free(rbd);
1172 		return -ENOMEM;
1173 	}
1174 
1175 	if (user_id) {
1176 		rbd->user_id = strdup(user_id);
1177 		if (!rbd->user_id) {
1178 			bdev_rbd_free(rbd);
1179 			return -ENOMEM;
1180 		}
1181 	}
1182 
1183 	if (cluster_name) {
1184 		rbd->cluster_name = strdup(cluster_name);
1185 		if (!rbd->cluster_name) {
1186 			bdev_rbd_free(rbd);
1187 			return -ENOMEM;
1188 		}
1189 	}
1190 	rbd->pool_name = strdup(pool_name);
1191 	if (!rbd->pool_name) {
1192 		bdev_rbd_free(rbd);
1193 		return -ENOMEM;
1194 	}
1195 
1196 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1197 		bdev_rbd_free(rbd);
1198 		return -ENOMEM;
1199 	}
1200 
1201 	ret = bdev_rbd_init(rbd);
1202 	if (ret < 0) {
1203 		bdev_rbd_free(rbd);
1204 		SPDK_ERRLOG("Failed to init rbd device\n");
1205 		return ret;
1206 	}
1207 
1208 	if (uuid) {
1209 		rbd->disk.uuid = *uuid;
1210 	} else {
1211 		spdk_uuid_generate(&rbd->disk.uuid);
1212 	}
1213 
1214 	if (name) {
1215 		rbd->disk.name = strdup(name);
1216 	} else {
1217 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1218 	}
1219 	if (!rbd->disk.name) {
1220 		bdev_rbd_free(rbd);
1221 		return -ENOMEM;
1222 	}
1223 	rbd->disk.product_name = "Ceph Rbd Disk";
1224 	bdev_rbd_count++;
1225 
1226 	rbd->disk.write_cache = 0;
1227 	rbd->disk.blocklen = block_size;
1228 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1229 	rbd->disk.ctxt = rbd;
1230 	rbd->disk.fn_table = &rbd_fn_table;
1231 	rbd->disk.module = &rbd_if;
1232 
1233 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1234 
1235 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1236 				bdev_rbd_destroy_cb,
1237 				sizeof(struct bdev_rbd_io_channel),
1238 				rbd_name);
1239 	ret = spdk_bdev_register(&rbd->disk);
1240 	if (ret) {
1241 		spdk_io_device_unregister(rbd, NULL);
1242 		bdev_rbd_free(rbd);
1243 		return ret;
1244 	}
1245 
1246 	*bdev = &(rbd->disk);
1247 
1248 	return ret;
1249 }
1250 
1251 void
1252 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1253 {
1254 	int rc;
1255 
1256 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1257 	if (rc != 0) {
1258 		cb_fn(cb_arg, rc);
1259 	}
1260 }
1261 
1262 static void
1263 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1264 {
1265 }
1266 
1267 int
1268 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1269 {
1270 	struct spdk_bdev_desc *desc;
1271 	struct spdk_bdev *bdev;
1272 	struct spdk_io_channel *ch;
1273 	struct bdev_rbd_io_channel *rbd_io_ch;
1274 	int rc = 0;
1275 	uint64_t new_size_in_byte;
1276 	uint64_t current_size_in_mb;
1277 
1278 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1279 	if (rc != 0) {
1280 		return rc;
1281 	}
1282 
1283 	bdev = spdk_bdev_desc_get_bdev(desc);
1284 
1285 	if (bdev->module != &rbd_if) {
1286 		rc = -EINVAL;
1287 		goto exit;
1288 	}
1289 
1290 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1291 	if (current_size_in_mb > new_size_in_mb) {
1292 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1293 		rc = -EINVAL;
1294 		goto exit;
1295 	}
1296 
1297 	ch = bdev_rbd_get_io_channel(bdev);
1298 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1299 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1300 
1301 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1302 	spdk_put_io_channel(ch);
1303 	if (rc != 0) {
1304 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1305 		goto exit;
1306 	}
1307 
1308 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1309 	if (rc != 0) {
1310 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1311 	}
1312 
1313 exit:
1314 	spdk_bdev_close(desc);
1315 	return rc;
1316 }
1317 
1318 static int
1319 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1320 {
1321 	return 0;
1322 }
1323 
1324 static void
1325 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1326 {
1327 }
1328 
1329 static int
1330 bdev_rbd_library_init(void)
1331 {
1332 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1333 				0, "bdev_rbd_poll_groups");
1334 	return 0;
1335 }
1336 
1337 static void
1338 bdev_rbd_library_fini(void)
1339 {
1340 	spdk_io_device_unregister(&rbd_if, NULL);
1341 }
1342 
1343 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1344