xref: /spdk/lib/sock/sock.c (revision bad2c8e86cc9de65297431f7b6aa715850e6c67a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/log.h"
37 #include "spdk/sock.h"
38 #include "spdk_internal/sock.h"
39 #include "spdk/queue.h"
40 
41 static STAILQ_HEAD(, spdk_net_impl) g_net_impls = STAILQ_HEAD_INITIALIZER(g_net_impls);
42 
43 struct spdk_sock_placement_id_entry {
44 	int placement_id;
45 	uint32_t ref;
46 	struct spdk_sock_group *group;
47 	STAILQ_ENTRY(spdk_sock_placement_id_entry) link;
48 };
49 
50 static STAILQ_HEAD(, spdk_sock_placement_id_entry) g_placement_id_map = STAILQ_HEAD_INITIALIZER(
51 			g_placement_id_map);
52 static pthread_mutex_t g_map_table_mutex = PTHREAD_MUTEX_INITIALIZER;
53 
54 /* Insert a group into the placement map.
55  * If the group is already in the map, take a reference.
56  */
57 static int
58 spdk_sock_map_insert(int placement_id, struct spdk_sock_group *group)
59 {
60 	struct spdk_sock_placement_id_entry *entry;
61 
62 	pthread_mutex_lock(&g_map_table_mutex);
63 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
64 		if (placement_id == entry->placement_id) {
65 			/* The mapping already exists, it means that different sockets have
66 			 * the same placement_ids.
67 			 */
68 			entry->ref++;
69 			pthread_mutex_unlock(&g_map_table_mutex);
70 			return 0;
71 		}
72 	}
73 
74 	entry = calloc(1, sizeof(*entry));
75 	if (!entry) {
76 		SPDK_ERRLOG("Cannot allocate an entry for placement_id=%u\n", placement_id);
77 		pthread_mutex_unlock(&g_map_table_mutex);
78 		return -ENOMEM;
79 	}
80 
81 	entry->placement_id = placement_id;
82 	entry->group = group;
83 	entry->ref++;
84 
85 	STAILQ_INSERT_TAIL(&g_placement_id_map, entry, link);
86 	pthread_mutex_unlock(&g_map_table_mutex);
87 
88 	return 0;
89 }
90 
91 /* Release a reference to the group for a given placement_id.
92  * If the reference count is 0, remove the group.
93  */
94 static void
95 spdk_sock_map_release(int placement_id)
96 {
97 	struct spdk_sock_placement_id_entry *entry;
98 
99 	pthread_mutex_lock(&g_map_table_mutex);
100 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
101 		if (placement_id == entry->placement_id) {
102 			assert(entry->ref > 0);
103 			entry->ref--;
104 			if (!entry->ref) {
105 				STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link);
106 				free(entry);
107 			}
108 			break;
109 		}
110 	}
111 
112 	pthread_mutex_unlock(&g_map_table_mutex);
113 }
114 
115 /* Look up the group for a placement_id. */
116 static void
117 spdk_sock_map_lookup(int placement_id, struct spdk_sock_group **group)
118 {
119 	struct spdk_sock_placement_id_entry *entry;
120 
121 	*group = NULL;
122 	pthread_mutex_lock(&g_map_table_mutex);
123 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
124 		if (placement_id == entry->placement_id) {
125 			assert(entry->group != NULL);
126 			*group = entry->group;
127 			break;
128 		}
129 	}
130 	pthread_mutex_unlock(&g_map_table_mutex);
131 }
132 
133 /* Remove the socket group from the map table */
134 static void
135 spdk_sock_remove_sock_group_from_map_table(struct spdk_sock_group *group)
136 {
137 	struct spdk_sock_placement_id_entry *entry, *tmp;
138 
139 	pthread_mutex_lock(&g_map_table_mutex);
140 	STAILQ_FOREACH_SAFE(entry, &g_placement_id_map, link, tmp) {
141 		if (entry->group == group) {
142 			STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link);
143 			free(entry);
144 		}
145 	}
146 	pthread_mutex_unlock(&g_map_table_mutex);
147 
148 }
149 
150 int
151 spdk_sock_get_optimal_sock_group(struct spdk_sock *sock, struct spdk_sock_group **group)
152 {
153 	int placement_id = 0, rc;
154 
155 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
156 	if (!rc && (placement_id != 0)) {
157 		spdk_sock_map_lookup(placement_id, group);
158 		return 0;
159 	} else {
160 		return -1;
161 	}
162 }
163 
164 int
165 spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport,
166 		  char *caddr, int clen, uint16_t *cport)
167 {
168 	return sock->net_impl->getaddr(sock, saddr, slen, sport, caddr, clen, cport);
169 }
170 
171 struct spdk_sock *
172 spdk_sock_connect(const char *ip, int port, char *impl_name)
173 {
174 	struct spdk_net_impl *impl = NULL;
175 	struct spdk_sock *sock;
176 
177 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
178 		if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) {
179 			continue;
180 		}
181 
182 		sock = impl->connect(ip, port);
183 		if (sock != NULL) {
184 			sock->net_impl = impl;
185 			TAILQ_INIT(&sock->queued_reqs);
186 			TAILQ_INIT(&sock->pending_reqs);
187 			return sock;
188 		}
189 	}
190 
191 	return NULL;
192 }
193 
194 struct spdk_sock *
195 spdk_sock_listen(const char *ip, int port, char *impl_name)
196 {
197 	struct spdk_net_impl *impl = NULL;
198 	struct spdk_sock *sock;
199 
200 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
201 		if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) {
202 			continue;
203 		}
204 
205 		sock = impl->listen(ip, port);
206 		if (sock != NULL) {
207 			sock->net_impl = impl;
208 			/* Don't need to initialize the request queues for listen
209 			 * sockets. */
210 			return sock;
211 		}
212 	}
213 
214 	return NULL;
215 }
216 
217 struct spdk_sock *
218 spdk_sock_accept(struct spdk_sock *sock)
219 {
220 	struct spdk_sock *new_sock;
221 
222 	new_sock = sock->net_impl->accept(sock);
223 	if (new_sock != NULL) {
224 		new_sock->net_impl = sock->net_impl;
225 		TAILQ_INIT(&new_sock->queued_reqs);
226 		TAILQ_INIT(&new_sock->pending_reqs);
227 	}
228 
229 	return new_sock;
230 }
231 
232 int
233 spdk_sock_close(struct spdk_sock **_sock)
234 {
235 	struct spdk_sock *sock = *_sock;
236 	int rc;
237 
238 	if (sock == NULL) {
239 		errno = EBADF;
240 		return -1;
241 	}
242 
243 	if (sock->cb_fn != NULL) {
244 		/* This sock is still part of a sock_group. */
245 		errno = EBUSY;
246 		return -1;
247 	}
248 
249 	sock->flags.closed = true;
250 
251 	if (sock->cb_cnt > 0) {
252 		/* Let the callback unwind before destroying the socket */
253 		return 0;
254 	}
255 
256 	spdk_sock_abort_requests(sock);
257 
258 	rc = sock->net_impl->close(sock);
259 	if (rc == 0) {
260 		*_sock = NULL;
261 	}
262 
263 	return rc;
264 }
265 
266 ssize_t
267 spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
268 {
269 	if (sock == NULL) {
270 		errno = EBADF;
271 		return -1;
272 	}
273 
274 	if (sock->flags.closed) {
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	return sock->net_impl->recv(sock, buf, len);
280 }
281 
282 ssize_t
283 spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
284 {
285 	if (sock == NULL) {
286 		errno = EBADF;
287 		return -1;
288 	}
289 
290 	if (sock->flags.closed) {
291 		errno = EBADF;
292 		return -1;
293 	}
294 
295 	return sock->net_impl->readv(sock, iov, iovcnt);
296 }
297 
298 ssize_t
299 spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
300 {
301 	if (sock == NULL) {
302 		errno = EBADF;
303 		return -1;
304 	}
305 
306 	if (sock->flags.closed) {
307 		errno = EBADF;
308 		return -1;
309 	}
310 
311 	return sock->net_impl->writev(sock, iov, iovcnt);
312 }
313 
314 void
315 spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
316 {
317 	assert(req->cb_fn != NULL);
318 
319 	if (sock == NULL) {
320 		req->cb_fn(req->cb_arg, -EBADF);
321 		return;
322 	}
323 
324 	if (sock->flags.closed) {
325 		req->cb_fn(req->cb_arg, -EBADF);
326 		return;
327 	}
328 
329 	sock->net_impl->writev_async(sock, req);
330 }
331 
332 int
333 spdk_sock_flush(struct spdk_sock *sock)
334 {
335 	if (sock == NULL) {
336 		return -EBADF;
337 	}
338 
339 	if (sock->flags.closed) {
340 		return -EBADF;
341 	}
342 
343 	return sock->net_impl->flush(sock);
344 }
345 
346 int
347 spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes)
348 {
349 	return sock->net_impl->set_recvlowat(sock, nbytes);
350 }
351 
352 int
353 spdk_sock_set_recvbuf(struct spdk_sock *sock, int sz)
354 {
355 	return sock->net_impl->set_recvbuf(sock, sz);
356 }
357 
358 int
359 spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz)
360 {
361 	return sock->net_impl->set_sendbuf(sock, sz);
362 }
363 
364 int
365 spdk_sock_set_priority(struct spdk_sock *sock, int priority)
366 {
367 	return sock->net_impl->set_priority(sock, priority);
368 }
369 
370 bool
371 spdk_sock_is_ipv6(struct spdk_sock *sock)
372 {
373 	return sock->net_impl->is_ipv6(sock);
374 }
375 
376 bool
377 spdk_sock_is_ipv4(struct spdk_sock *sock)
378 {
379 	return sock->net_impl->is_ipv4(sock);
380 }
381 
382 bool
383 spdk_sock_is_connected(struct spdk_sock *sock)
384 {
385 	return sock->net_impl->is_connected(sock);
386 }
387 
388 struct spdk_sock_group *
389 spdk_sock_group_create(void *ctx)
390 {
391 	struct spdk_net_impl *impl = NULL;
392 	struct spdk_sock_group *group;
393 	struct spdk_sock_group_impl *group_impl;
394 
395 	group = calloc(1, sizeof(*group));
396 	if (group == NULL) {
397 		return NULL;
398 	}
399 
400 	STAILQ_INIT(&group->group_impls);
401 
402 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
403 		group_impl = impl->group_impl_create();
404 		if (group_impl != NULL) {
405 			STAILQ_INSERT_TAIL(&group->group_impls, group_impl, link);
406 			TAILQ_INIT(&group_impl->socks);
407 			group_impl->net_impl = impl;
408 		}
409 	}
410 
411 	group->ctx = ctx;
412 	return group;
413 }
414 
415 void *
416 spdk_sock_group_get_ctx(struct spdk_sock_group *group)
417 {
418 	if (group == NULL) {
419 		return NULL;
420 	}
421 
422 	return group->ctx;
423 }
424 
425 int
426 spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock,
427 			 spdk_sock_cb cb_fn, void *cb_arg)
428 {
429 	struct spdk_sock_group_impl *group_impl = NULL;
430 	int rc, placement_id = 0;
431 
432 	if (cb_fn == NULL) {
433 		errno = EINVAL;
434 		return -1;
435 	}
436 
437 	if (sock->group_impl != NULL) {
438 		/*
439 		 * This sock is already part of a sock_group.  Currently we don't
440 		 *  support this.
441 		 */
442 		errno = EBUSY;
443 		return -1;
444 	}
445 
446 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
447 	if (!rc && (placement_id != 0)) {
448 		rc = spdk_sock_map_insert(placement_id, group);
449 		if (rc < 0) {
450 			return -1;
451 		}
452 	}
453 
454 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
455 		if (sock->net_impl == group_impl->net_impl) {
456 			break;
457 		}
458 	}
459 
460 	if (group_impl == NULL) {
461 		errno = EINVAL;
462 		return -1;
463 	}
464 
465 	rc = group_impl->net_impl->group_impl_add_sock(group_impl, sock);
466 	if (rc == 0) {
467 		TAILQ_INSERT_TAIL(&group_impl->socks, sock, link);
468 		sock->group_impl = group_impl;
469 		sock->cb_fn = cb_fn;
470 		sock->cb_arg = cb_arg;
471 	}
472 
473 	return rc;
474 }
475 
476 int
477 spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock)
478 {
479 	struct spdk_sock_group_impl *group_impl = NULL;
480 	int rc, placement_id = 0;
481 
482 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
483 		if (sock->net_impl == group_impl->net_impl) {
484 			break;
485 		}
486 	}
487 
488 	if (group_impl == NULL) {
489 		errno = EINVAL;
490 		return -1;
491 	}
492 
493 	assert(group_impl == sock->group_impl);
494 
495 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
496 	if (!rc && (placement_id != 0)) {
497 		spdk_sock_map_release(placement_id);
498 	}
499 
500 	rc = group_impl->net_impl->group_impl_remove_sock(group_impl, sock);
501 	if (rc == 0) {
502 		TAILQ_REMOVE(&group_impl->socks, sock, link);
503 		sock->group_impl = NULL;
504 		sock->cb_fn = NULL;
505 		sock->cb_arg = NULL;
506 	}
507 
508 	return rc;
509 }
510 
511 int
512 spdk_sock_group_poll(struct spdk_sock_group *group)
513 {
514 	return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL);
515 }
516 
517 static int
518 spdk_sock_group_impl_poll_count(struct spdk_sock_group_impl *group_impl,
519 				struct spdk_sock_group *group,
520 				int max_events)
521 {
522 	struct spdk_sock *socks[MAX_EVENTS_PER_POLL];
523 	int num_events, i;
524 
525 	if (TAILQ_EMPTY(&group_impl->socks)) {
526 		return 0;
527 	}
528 
529 	num_events = group_impl->net_impl->group_impl_poll(group_impl, max_events, socks);
530 	if (num_events == -1) {
531 		return -1;
532 	}
533 
534 	for (i = 0; i < num_events; i++) {
535 		struct spdk_sock *sock = socks[i];
536 
537 		assert(sock->cb_fn != NULL);
538 		sock->cb_fn(sock->cb_arg, group, sock);
539 	}
540 	return num_events;
541 }
542 
543 int
544 spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events)
545 {
546 	struct spdk_sock_group_impl *group_impl = NULL;
547 	int rc, num_events = 0;
548 
549 	if (max_events < 1) {
550 		errno = -EINVAL;
551 		return -1;
552 	}
553 
554 	/*
555 	 * Only poll for up to 32 events at a time - if more events are pending,
556 	 *  the next call to this function will reap them.
557 	 */
558 	if (max_events > MAX_EVENTS_PER_POLL) {
559 		max_events = MAX_EVENTS_PER_POLL;
560 	}
561 
562 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
563 		rc = spdk_sock_group_impl_poll_count(group_impl, group, max_events);
564 		if (rc < 0) {
565 			num_events = -1;
566 			SPDK_ERRLOG("group_impl_poll_count for net(%s) failed\n",
567 				    group_impl->net_impl->name);
568 		} else if (num_events >= 0) {
569 			num_events += rc;
570 		}
571 	}
572 
573 	return num_events;
574 }
575 
576 int
577 spdk_sock_group_close(struct spdk_sock_group **group)
578 {
579 	struct spdk_sock_group_impl *group_impl = NULL, *tmp;
580 	int rc;
581 
582 	if (*group == NULL) {
583 		errno = EBADF;
584 		return -1;
585 	}
586 
587 	STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) {
588 		if (!TAILQ_EMPTY(&group_impl->socks)) {
589 			errno = EBUSY;
590 			return -1;
591 		}
592 	}
593 
594 	STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) {
595 		rc = group_impl->net_impl->group_impl_close(group_impl);
596 		if (rc != 0) {
597 			SPDK_ERRLOG("group_impl_close for net(%s) failed\n",
598 				    group_impl->net_impl->name);
599 		}
600 	}
601 
602 	spdk_sock_remove_sock_group_from_map_table(*group);
603 	free(*group);
604 	*group = NULL;
605 
606 	return 0;
607 }
608 
609 void
610 spdk_net_impl_register(struct spdk_net_impl *impl, int priority)
611 {
612 	struct spdk_net_impl *cur, *prev;
613 
614 	impl->priority = priority;
615 	prev = NULL;
616 	STAILQ_FOREACH(cur, &g_net_impls, link) {
617 		if (impl->priority > cur->priority) {
618 			break;
619 		}
620 		prev = cur;
621 	}
622 
623 	if (prev) {
624 		STAILQ_INSERT_AFTER(&g_net_impls, prev, impl, link);
625 	} else {
626 		STAILQ_INSERT_HEAD(&g_net_impls, impl, link);
627 	}
628 }
629