xref: /spdk/lib/sock/sock.c (revision 1fa071d332db21bf893d581a8e93b425ba788a24)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/log.h"
37 #include "spdk/sock.h"
38 #include "spdk_internal/sock.h"
39 #include "spdk/queue.h"
40 
41 static STAILQ_HEAD(, spdk_net_impl) g_net_impls = STAILQ_HEAD_INITIALIZER(g_net_impls);
42 
43 struct spdk_sock_placement_id_entry {
44 	int placement_id;
45 	uint32_t ref;
46 	struct spdk_sock_group *group;
47 	STAILQ_ENTRY(spdk_sock_placement_id_entry) link;
48 };
49 
50 static STAILQ_HEAD(, spdk_sock_placement_id_entry) g_placement_id_map = STAILQ_HEAD_INITIALIZER(
51 			g_placement_id_map);
52 static pthread_mutex_t g_map_table_mutex = PTHREAD_MUTEX_INITIALIZER;
53 
54 /* Insert a group into the placement map.
55  * If the group is already in the map, take a reference.
56  */
57 static int
58 spdk_sock_map_insert(int placement_id, struct spdk_sock_group *group)
59 {
60 	struct spdk_sock_placement_id_entry *entry;
61 
62 	pthread_mutex_lock(&g_map_table_mutex);
63 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
64 		if (placement_id == entry->placement_id) {
65 			/* The mapping already exists, it means that different sockets have
66 			 * the same placement_ids.
67 			 */
68 			entry->ref++;
69 			pthread_mutex_unlock(&g_map_table_mutex);
70 			return 0;
71 		}
72 	}
73 
74 	entry = calloc(1, sizeof(*entry));
75 	if (!entry) {
76 		SPDK_ERRLOG("Cannot allocate an entry for placement_id=%u\n", placement_id);
77 		pthread_mutex_unlock(&g_map_table_mutex);
78 		return -ENOMEM;
79 	}
80 
81 	entry->placement_id = placement_id;
82 	entry->group = group;
83 	entry->ref++;
84 
85 	STAILQ_INSERT_TAIL(&g_placement_id_map, entry, link);
86 	pthread_mutex_unlock(&g_map_table_mutex);
87 
88 	return 0;
89 }
90 
91 /* Release a reference to the group for a given placement_id.
92  * If the reference count is 0, remove the group.
93  */
94 static void
95 spdk_sock_map_release(int placement_id)
96 {
97 	struct spdk_sock_placement_id_entry *entry;
98 
99 	pthread_mutex_lock(&g_map_table_mutex);
100 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
101 		if (placement_id == entry->placement_id) {
102 			assert(entry->ref > 0);
103 			entry->ref--;
104 			break;
105 		}
106 	}
107 
108 	pthread_mutex_unlock(&g_map_table_mutex);
109 }
110 
111 /* Look up the group for a placement_id. */
112 static void
113 spdk_sock_map_lookup(int placement_id, struct spdk_sock_group **group)
114 {
115 	struct spdk_sock_placement_id_entry *entry;
116 
117 	*group = NULL;
118 	pthread_mutex_lock(&g_map_table_mutex);
119 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
120 		if (placement_id == entry->placement_id) {
121 			assert(entry->group != NULL);
122 			*group = entry->group;
123 			break;
124 		}
125 	}
126 	pthread_mutex_unlock(&g_map_table_mutex);
127 }
128 
129 /* Remove the socket group from the map table */
130 static void
131 spdk_sock_remove_sock_group_from_map_table(struct spdk_sock_group *group)
132 {
133 	struct spdk_sock_placement_id_entry *entry, *tmp;
134 
135 	pthread_mutex_lock(&g_map_table_mutex);
136 	STAILQ_FOREACH_SAFE(entry, &g_placement_id_map, link, tmp) {
137 		if (entry->group == group) {
138 			STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link);
139 			free(entry);
140 		}
141 	}
142 	pthread_mutex_unlock(&g_map_table_mutex);
143 
144 }
145 
146 int
147 spdk_sock_get_optimal_sock_group(struct spdk_sock *sock, struct spdk_sock_group **group)
148 {
149 	int placement_id = 0, rc;
150 
151 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
152 	if (!rc && (placement_id != 0)) {
153 		spdk_sock_map_lookup(placement_id, group);
154 		return 0;
155 	} else {
156 		return -1;
157 	}
158 }
159 
160 int
161 spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport,
162 		  char *caddr, int clen, uint16_t *cport)
163 {
164 	return sock->net_impl->getaddr(sock, saddr, slen, sport, caddr, clen, cport);
165 }
166 
167 struct spdk_sock *
168 spdk_sock_connect(const char *ip, int port, char *impl_name)
169 {
170 	struct spdk_net_impl *impl = NULL;
171 	struct spdk_sock *sock;
172 
173 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
174 		if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) {
175 			continue;
176 		}
177 
178 		sock = impl->connect(ip, port);
179 		if (sock != NULL) {
180 			sock->net_impl = impl;
181 			TAILQ_INIT(&sock->queued_reqs);
182 			TAILQ_INIT(&sock->pending_reqs);
183 			return sock;
184 		}
185 	}
186 
187 	return NULL;
188 }
189 
190 struct spdk_sock *
191 spdk_sock_listen(const char *ip, int port, char *impl_name)
192 {
193 	struct spdk_net_impl *impl = NULL;
194 	struct spdk_sock *sock;
195 
196 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
197 		if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) {
198 			continue;
199 		}
200 
201 		sock = impl->listen(ip, port);
202 		if (sock != NULL) {
203 			sock->net_impl = impl;
204 			/* Don't need to initialize the request queues for listen
205 			 * sockets. */
206 			return sock;
207 		}
208 	}
209 
210 	return NULL;
211 }
212 
213 struct spdk_sock *
214 spdk_sock_accept(struct spdk_sock *sock)
215 {
216 	struct spdk_sock *new_sock;
217 
218 	new_sock = sock->net_impl->accept(sock);
219 	if (new_sock != NULL) {
220 		new_sock->net_impl = sock->net_impl;
221 		TAILQ_INIT(&new_sock->queued_reqs);
222 		TAILQ_INIT(&new_sock->pending_reqs);
223 	}
224 
225 	return new_sock;
226 }
227 
228 int
229 spdk_sock_close(struct spdk_sock **_sock)
230 {
231 	struct spdk_sock *sock = *_sock;
232 	int rc;
233 
234 	if (sock == NULL) {
235 		errno = EBADF;
236 		return -1;
237 	}
238 
239 	if (sock->cb_fn != NULL) {
240 		/* This sock is still part of a sock_group. */
241 		errno = EBUSY;
242 		return -1;
243 	}
244 
245 	sock->flags.closed = true;
246 
247 	if (sock->cb_cnt > 0) {
248 		/* Let the callback unwind before destroying the socket */
249 		return 0;
250 	}
251 
252 	spdk_sock_abort_requests(sock);
253 
254 	rc = sock->net_impl->close(sock);
255 	if (rc == 0) {
256 		*_sock = NULL;
257 	}
258 
259 	return rc;
260 }
261 
262 ssize_t
263 spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
264 {
265 	if (sock == NULL) {
266 		errno = EBADF;
267 		return -1;
268 	}
269 
270 	if (sock->flags.closed) {
271 		errno = EBADF;
272 		return -1;
273 	}
274 
275 	return sock->net_impl->recv(sock, buf, len);
276 }
277 
278 ssize_t
279 spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
280 {
281 	if (sock == NULL) {
282 		errno = EBADF;
283 		return -1;
284 	}
285 
286 	if (sock->flags.closed) {
287 		errno = EBADF;
288 		return -1;
289 	}
290 
291 	return sock->net_impl->readv(sock, iov, iovcnt);
292 }
293 
294 ssize_t
295 spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
296 {
297 	if (sock == NULL) {
298 		errno = EBADF;
299 		return -1;
300 	}
301 
302 	if (sock->flags.closed) {
303 		errno = EBADF;
304 		return -1;
305 	}
306 
307 	return sock->net_impl->writev(sock, iov, iovcnt);
308 }
309 
310 void
311 spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
312 {
313 	assert(req->cb_fn != NULL);
314 
315 	if (sock == NULL) {
316 		req->cb_fn(req->cb_arg, -EBADF);
317 		return;
318 	}
319 
320 	if (sock->flags.closed) {
321 		req->cb_fn(req->cb_arg, -EBADF);
322 		return;
323 	}
324 
325 	sock->net_impl->writev_async(sock, req);
326 }
327 
328 int
329 spdk_sock_flush(struct spdk_sock *sock)
330 {
331 	if (sock == NULL) {
332 		return -EBADF;
333 	}
334 
335 	if (sock->flags.closed) {
336 		return -EBADF;
337 	}
338 
339 	return sock->net_impl->flush(sock);
340 }
341 
342 int
343 spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes)
344 {
345 	return sock->net_impl->set_recvlowat(sock, nbytes);
346 }
347 
348 int
349 spdk_sock_set_recvbuf(struct spdk_sock *sock, int sz)
350 {
351 	return sock->net_impl->set_recvbuf(sock, sz);
352 }
353 
354 int
355 spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz)
356 {
357 	return sock->net_impl->set_sendbuf(sock, sz);
358 }
359 
360 bool
361 spdk_sock_is_ipv6(struct spdk_sock *sock)
362 {
363 	return sock->net_impl->is_ipv6(sock);
364 }
365 
366 bool
367 spdk_sock_is_ipv4(struct spdk_sock *sock)
368 {
369 	return sock->net_impl->is_ipv4(sock);
370 }
371 
372 bool
373 spdk_sock_is_connected(struct spdk_sock *sock)
374 {
375 	return sock->net_impl->is_connected(sock);
376 }
377 
378 struct spdk_sock_group *
379 spdk_sock_group_create(void *ctx)
380 {
381 	struct spdk_net_impl *impl = NULL;
382 	struct spdk_sock_group *group;
383 	struct spdk_sock_group_impl *group_impl;
384 
385 	group = calloc(1, sizeof(*group));
386 	if (group == NULL) {
387 		return NULL;
388 	}
389 
390 	STAILQ_INIT(&group->group_impls);
391 
392 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
393 		group_impl = impl->group_impl_create();
394 		if (group_impl != NULL) {
395 			STAILQ_INSERT_TAIL(&group->group_impls, group_impl, link);
396 			TAILQ_INIT(&group_impl->socks);
397 			group_impl->num_removed_socks = 0;
398 			group_impl->net_impl = impl;
399 		}
400 	}
401 
402 	group->ctx = ctx;
403 	return group;
404 }
405 
406 void *
407 spdk_sock_group_get_ctx(struct spdk_sock_group *group)
408 {
409 	if (group == NULL) {
410 		return NULL;
411 	}
412 
413 	return group->ctx;
414 }
415 
416 int
417 spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock,
418 			 spdk_sock_cb cb_fn, void *cb_arg)
419 {
420 	struct spdk_sock_group_impl *group_impl = NULL;
421 	int rc, placement_id = 0;
422 
423 	if (cb_fn == NULL) {
424 		errno = EINVAL;
425 		return -1;
426 	}
427 
428 	if (sock->group_impl != NULL) {
429 		/*
430 		 * This sock is already part of a sock_group.  Currently we don't
431 		 *  support this.
432 		 */
433 		errno = EBUSY;
434 		return -1;
435 	}
436 
437 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
438 	if (!rc && (placement_id != 0)) {
439 		rc = spdk_sock_map_insert(placement_id, group);
440 		if (rc < 0) {
441 			return -1;
442 		}
443 	}
444 
445 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
446 		if (sock->net_impl == group_impl->net_impl) {
447 			break;
448 		}
449 	}
450 
451 	if (group_impl == NULL) {
452 		errno = EINVAL;
453 		return -1;
454 	}
455 
456 	rc = group_impl->net_impl->group_impl_add_sock(group_impl, sock);
457 	if (rc == 0) {
458 		TAILQ_INSERT_TAIL(&group_impl->socks, sock, link);
459 		sock->group_impl = group_impl;
460 		sock->cb_fn = cb_fn;
461 		sock->cb_arg = cb_arg;
462 	}
463 
464 	return rc;
465 }
466 
467 int
468 spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock)
469 {
470 	struct spdk_sock_group_impl *group_impl = NULL;
471 	int rc, placement_id = 0;
472 
473 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
474 		if (sock->net_impl == group_impl->net_impl) {
475 			break;
476 		}
477 	}
478 
479 	if (group_impl == NULL) {
480 		errno = EINVAL;
481 		return -1;
482 	}
483 
484 	assert(group_impl == sock->group_impl);
485 
486 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
487 	if (!rc && (placement_id != 0)) {
488 		spdk_sock_map_release(placement_id);
489 	}
490 
491 	rc = group_impl->net_impl->group_impl_remove_sock(group_impl, sock);
492 	if (rc == 0) {
493 		TAILQ_REMOVE(&group_impl->socks, sock, link);
494 		assert(group_impl->num_removed_socks < MAX_EVENTS_PER_POLL);
495 		group_impl->removed_socks[group_impl->num_removed_socks] = (uintptr_t)sock;
496 		group_impl->num_removed_socks++;
497 		sock->group_impl = NULL;
498 		sock->cb_fn = NULL;
499 		sock->cb_arg = NULL;
500 	}
501 
502 	return rc;
503 }
504 
505 int
506 spdk_sock_group_poll(struct spdk_sock_group *group)
507 {
508 	return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL);
509 }
510 
511 static int
512 spdk_sock_group_impl_poll_count(struct spdk_sock_group_impl *group_impl,
513 				struct spdk_sock_group *group,
514 				int max_events)
515 {
516 	struct spdk_sock *socks[MAX_EVENTS_PER_POLL];
517 	int num_events, i;
518 
519 	if (TAILQ_EMPTY(&group_impl->socks)) {
520 		return 0;
521 	}
522 
523 	/* The number of removed sockets should be reset for each call to poll. */
524 	group_impl->num_removed_socks = 0;
525 
526 	num_events = group_impl->net_impl->group_impl_poll(group_impl, max_events, socks);
527 	if (num_events == -1) {
528 		return -1;
529 	}
530 
531 	for (i = 0; i < num_events; i++) {
532 		struct spdk_sock *sock = socks[i];
533 		int j;
534 		bool valid = true;
535 		for (j = 0; j < group_impl->num_removed_socks; j++) {
536 			if ((uintptr_t)sock == group_impl->removed_socks[j]) {
537 				valid = false;
538 				break;
539 			}
540 		}
541 
542 		if (valid) {
543 			assert(sock->cb_fn != NULL);
544 			sock->cb_fn(sock->cb_arg, group, sock);
545 		}
546 	}
547 
548 	return num_events;
549 }
550 
551 int
552 spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events)
553 {
554 	struct spdk_sock_group_impl *group_impl = NULL;
555 	int rc, num_events = 0;
556 
557 	if (max_events < 1) {
558 		errno = -EINVAL;
559 		return -1;
560 	}
561 
562 	/*
563 	 * Only poll for up to 32 events at a time - if more events are pending,
564 	 *  the next call to this function will reap them.
565 	 */
566 	if (max_events > MAX_EVENTS_PER_POLL) {
567 		max_events = MAX_EVENTS_PER_POLL;
568 	}
569 
570 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
571 		rc = spdk_sock_group_impl_poll_count(group_impl, group, max_events);
572 		if (rc < 0) {
573 			num_events = -1;
574 			SPDK_ERRLOG("group_impl_poll_count for net(%s) failed\n",
575 				    group_impl->net_impl->name);
576 		} else if (num_events >= 0) {
577 			num_events += rc;
578 		}
579 	}
580 
581 	return num_events;
582 }
583 
584 int
585 spdk_sock_group_close(struct spdk_sock_group **group)
586 {
587 	struct spdk_sock_group_impl *group_impl = NULL, *tmp;
588 	int rc;
589 
590 	if (*group == NULL) {
591 		errno = EBADF;
592 		return -1;
593 	}
594 
595 	STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) {
596 		if (!TAILQ_EMPTY(&group_impl->socks)) {
597 			errno = EBUSY;
598 			return -1;
599 		}
600 	}
601 
602 	STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) {
603 		rc = group_impl->net_impl->group_impl_close(group_impl);
604 		if (rc != 0) {
605 			SPDK_ERRLOG("group_impl_close for net(%s) failed\n",
606 				    group_impl->net_impl->name);
607 		}
608 	}
609 
610 	spdk_sock_remove_sock_group_from_map_table(*group);
611 	free(*group);
612 	*group = NULL;
613 
614 	return 0;
615 }
616 
617 void
618 spdk_net_impl_register(struct spdk_net_impl *impl, int priority)
619 {
620 	struct spdk_net_impl *cur, *prev;
621 
622 	impl->priority = priority;
623 	prev = NULL;
624 	STAILQ_FOREACH(cur, &g_net_impls, link) {
625 		if (impl->priority > cur->priority) {
626 			break;
627 		}
628 		prev = cur;
629 	}
630 
631 	if (prev) {
632 		STAILQ_INSERT_AFTER(&g_net_impls, prev, impl, link);
633 	} else {
634 		STAILQ_INSERT_HEAD(&g_net_impls, impl, link);
635 	}
636 }
637