xref: /spdk/lib/sock/sock.c (revision 7961de43413e7f818f7499bf8518909beb59c82f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/log.h"
37 #include "spdk/sock.h"
38 #include "spdk_internal/sock.h"
39 #include "spdk/queue.h"
40 
41 static STAILQ_HEAD(, spdk_net_impl) g_net_impls = STAILQ_HEAD_INITIALIZER(g_net_impls);
42 
43 struct spdk_sock_placement_id_entry {
44 	int placement_id;
45 	uint32_t ref;
46 	struct spdk_sock_group *group;
47 	STAILQ_ENTRY(spdk_sock_placement_id_entry) link;
48 };
49 
50 static STAILQ_HEAD(, spdk_sock_placement_id_entry) g_placement_id_map = STAILQ_HEAD_INITIALIZER(
51 			g_placement_id_map);
52 static pthread_mutex_t g_map_table_mutex = PTHREAD_MUTEX_INITIALIZER;
53 
54 /* Insert a group into the placement map.
55  * If the group is already in the map, take a reference.
56  */
57 static int
58 spdk_sock_map_insert(int placement_id, struct spdk_sock_group *group)
59 {
60 	struct spdk_sock_placement_id_entry *entry;
61 
62 	pthread_mutex_lock(&g_map_table_mutex);
63 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
64 		if (placement_id == entry->placement_id) {
65 			/* The mapping already exists, it means that different sockets have
66 			 * the same placement_ids.
67 			 */
68 			entry->ref++;
69 			pthread_mutex_unlock(&g_map_table_mutex);
70 			return 0;
71 		}
72 	}
73 
74 	entry = calloc(1, sizeof(*entry));
75 	if (!entry) {
76 		SPDK_ERRLOG("Cannot allocate an entry for placement_id=%u\n", placement_id);
77 		pthread_mutex_unlock(&g_map_table_mutex);
78 		return -ENOMEM;
79 	}
80 
81 	entry->placement_id = placement_id;
82 	entry->group = group;
83 	entry->ref++;
84 
85 	STAILQ_INSERT_TAIL(&g_placement_id_map, entry, link);
86 	pthread_mutex_unlock(&g_map_table_mutex);
87 
88 	return 0;
89 }
90 
91 /* Release a reference to the group for a given placement_id.
92  * If the reference count is 0, remove the group.
93  */
94 static void
95 spdk_sock_map_release(int placement_id)
96 {
97 	struct spdk_sock_placement_id_entry *entry;
98 
99 	pthread_mutex_lock(&g_map_table_mutex);
100 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
101 		if (placement_id == entry->placement_id) {
102 			assert(entry->ref > 0);
103 			entry->ref--;
104 			if (!entry->ref) {
105 				STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link);
106 				free(entry);
107 			}
108 			break;
109 		}
110 	}
111 
112 	pthread_mutex_unlock(&g_map_table_mutex);
113 }
114 
115 /* Look up the group for a placement_id. */
116 static void
117 spdk_sock_map_lookup(int placement_id, struct spdk_sock_group **group)
118 {
119 	struct spdk_sock_placement_id_entry *entry;
120 
121 	*group = NULL;
122 	pthread_mutex_lock(&g_map_table_mutex);
123 	STAILQ_FOREACH(entry, &g_placement_id_map, link) {
124 		if (placement_id == entry->placement_id) {
125 			assert(entry->group != NULL);
126 			*group = entry->group;
127 			break;
128 		}
129 	}
130 	pthread_mutex_unlock(&g_map_table_mutex);
131 }
132 
133 /* Remove the socket group from the map table */
134 static void
135 spdk_sock_remove_sock_group_from_map_table(struct spdk_sock_group *group)
136 {
137 	struct spdk_sock_placement_id_entry *entry, *tmp;
138 
139 	pthread_mutex_lock(&g_map_table_mutex);
140 	STAILQ_FOREACH_SAFE(entry, &g_placement_id_map, link, tmp) {
141 		if (entry->group == group) {
142 			STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link);
143 			free(entry);
144 		}
145 	}
146 	pthread_mutex_unlock(&g_map_table_mutex);
147 
148 }
149 
150 int
151 spdk_sock_get_optimal_sock_group(struct spdk_sock *sock, struct spdk_sock_group **group)
152 {
153 	int placement_id = 0, rc;
154 
155 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
156 	if (!rc && (placement_id != 0)) {
157 		spdk_sock_map_lookup(placement_id, group);
158 		return 0;
159 	} else {
160 		return -1;
161 	}
162 }
163 
164 int
165 spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport,
166 		  char *caddr, int clen, uint16_t *cport)
167 {
168 	return sock->net_impl->getaddr(sock, saddr, slen, sport, caddr, clen, cport);
169 }
170 
171 struct spdk_sock *
172 spdk_sock_connect(const char *ip, int port)
173 {
174 	struct spdk_net_impl *impl = NULL;
175 	struct spdk_sock *sock;
176 
177 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
178 		sock = impl->connect(ip, port);
179 		if (sock != NULL) {
180 			sock->net_impl = impl;
181 			TAILQ_INIT(&sock->queued_reqs);
182 			TAILQ_INIT(&sock->pending_reqs);
183 			return sock;
184 		}
185 	}
186 
187 	return NULL;
188 }
189 
190 struct spdk_sock *
191 spdk_sock_listen(const char *ip, int port)
192 {
193 	struct spdk_net_impl *impl = NULL;
194 	struct spdk_sock *sock;
195 
196 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
197 		sock = impl->listen(ip, port);
198 		if (sock != NULL) {
199 			sock->net_impl = impl;
200 			/* Don't need to initialize the request queues for listen
201 			 * sockets. */
202 			return sock;
203 		}
204 	}
205 
206 	return NULL;
207 }
208 
209 struct spdk_sock *
210 spdk_sock_accept(struct spdk_sock *sock)
211 {
212 	struct spdk_sock *new_sock;
213 
214 	new_sock = sock->net_impl->accept(sock);
215 	if (new_sock != NULL) {
216 		new_sock->net_impl = sock->net_impl;
217 		TAILQ_INIT(&new_sock->queued_reqs);
218 		TAILQ_INIT(&new_sock->pending_reqs);
219 	}
220 
221 	return new_sock;
222 }
223 
224 int
225 spdk_sock_close(struct spdk_sock **_sock)
226 {
227 	struct spdk_sock *sock = *_sock;
228 	int rc;
229 
230 	if (sock == NULL) {
231 		errno = EBADF;
232 		return -1;
233 	}
234 
235 	if (sock->cb_fn != NULL) {
236 		/* This sock is still part of a sock_group. */
237 		errno = EBUSY;
238 		return -1;
239 	}
240 
241 	sock->flags.closed = true;
242 
243 	if (sock->cb_cnt > 0) {
244 		/* Let the callback unwind before destroying the socket */
245 		return 0;
246 	}
247 
248 	spdk_sock_abort_requests(sock);
249 
250 	rc = sock->net_impl->close(sock);
251 	if (rc == 0) {
252 		*_sock = NULL;
253 	}
254 
255 	return rc;
256 }
257 
258 ssize_t
259 spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
260 {
261 	if (sock == NULL) {
262 		errno = EBADF;
263 		return -1;
264 	}
265 
266 	if (sock->flags.closed) {
267 		errno = EBADF;
268 		return -1;
269 	}
270 
271 	return sock->net_impl->recv(sock, buf, len);
272 }
273 
274 ssize_t
275 spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
276 {
277 	if (sock == NULL) {
278 		errno = EBADF;
279 		return -1;
280 	}
281 
282 	if (sock->flags.closed) {
283 		errno = EBADF;
284 		return -1;
285 	}
286 
287 	return sock->net_impl->readv(sock, iov, iovcnt);
288 }
289 
290 ssize_t
291 spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
292 {
293 	if (sock == NULL) {
294 		errno = EBADF;
295 		return -1;
296 	}
297 
298 	if (sock->flags.closed) {
299 		errno = EBADF;
300 		return -1;
301 	}
302 
303 	return sock->net_impl->writev(sock, iov, iovcnt);
304 }
305 
306 void
307 spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
308 {
309 	assert(req->cb_fn != NULL);
310 
311 	if (sock == NULL) {
312 		req->cb_fn(req->cb_arg, -EBADF);
313 		return;
314 	}
315 
316 	if (sock->flags.closed) {
317 		req->cb_fn(req->cb_arg, -EBADF);
318 		return;
319 	}
320 
321 	sock->net_impl->writev_async(sock, req);
322 }
323 
324 int
325 spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes)
326 {
327 	return sock->net_impl->set_recvlowat(sock, nbytes);
328 }
329 
330 int
331 spdk_sock_set_recvbuf(struct spdk_sock *sock, int sz)
332 {
333 	return sock->net_impl->set_recvbuf(sock, sz);
334 }
335 
336 int
337 spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz)
338 {
339 	return sock->net_impl->set_sendbuf(sock, sz);
340 }
341 
342 int
343 spdk_sock_set_priority(struct spdk_sock *sock, int priority)
344 {
345 	return sock->net_impl->set_priority(sock, priority);
346 }
347 
348 bool
349 spdk_sock_is_ipv6(struct spdk_sock *sock)
350 {
351 	return sock->net_impl->is_ipv6(sock);
352 }
353 
354 bool
355 spdk_sock_is_ipv4(struct spdk_sock *sock)
356 {
357 	return sock->net_impl->is_ipv4(sock);
358 }
359 
360 bool
361 spdk_sock_is_connected(struct spdk_sock *sock)
362 {
363 	return sock->net_impl->is_connected(sock);
364 }
365 
366 struct spdk_sock_group *
367 spdk_sock_group_create(void *ctx)
368 {
369 	struct spdk_net_impl *impl = NULL;
370 	struct spdk_sock_group *group;
371 	struct spdk_sock_group_impl *group_impl;
372 
373 	group = calloc(1, sizeof(*group));
374 	if (group == NULL) {
375 		return NULL;
376 	}
377 
378 	STAILQ_INIT(&group->group_impls);
379 
380 	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
381 		group_impl = impl->group_impl_create();
382 		if (group_impl != NULL) {
383 			STAILQ_INSERT_TAIL(&group->group_impls, group_impl, link);
384 			TAILQ_INIT(&group_impl->socks);
385 			group_impl->net_impl = impl;
386 		}
387 	}
388 
389 	group->ctx = ctx;
390 	return group;
391 }
392 
393 void *
394 spdk_sock_group_get_ctx(struct spdk_sock_group *group)
395 {
396 	if (group == NULL) {
397 		return NULL;
398 	}
399 
400 	return group->ctx;
401 }
402 
403 int
404 spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock,
405 			 spdk_sock_cb cb_fn, void *cb_arg)
406 {
407 	struct spdk_sock_group_impl *group_impl = NULL;
408 	int rc, placement_id = 0;
409 
410 	if (cb_fn == NULL) {
411 		errno = EINVAL;
412 		return -1;
413 	}
414 
415 	if (sock->group_impl != NULL) {
416 		/*
417 		 * This sock is already part of a sock_group.  Currently we don't
418 		 *  support this.
419 		 */
420 		errno = EBUSY;
421 		return -1;
422 	}
423 
424 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
425 	if (!rc && (placement_id != 0)) {
426 		rc = spdk_sock_map_insert(placement_id, group);
427 		if (rc < 0) {
428 			return -1;
429 		}
430 	}
431 
432 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
433 		if (sock->net_impl == group_impl->net_impl) {
434 			break;
435 		}
436 	}
437 
438 	if (group_impl == NULL) {
439 		errno = EINVAL;
440 		return -1;
441 	}
442 
443 	rc = group_impl->net_impl->group_impl_add_sock(group_impl, sock);
444 	if (rc == 0) {
445 		TAILQ_INSERT_TAIL(&group_impl->socks, sock, link);
446 		sock->group_impl = group_impl;
447 		sock->cb_fn = cb_fn;
448 		sock->cb_arg = cb_arg;
449 	}
450 
451 	return rc;
452 }
453 
454 int
455 spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock)
456 {
457 	struct spdk_sock_group_impl *group_impl = NULL;
458 	int rc, placement_id = 0;
459 
460 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
461 		if (sock->net_impl == group_impl->net_impl) {
462 			break;
463 		}
464 	}
465 
466 	if (group_impl == NULL) {
467 		errno = EINVAL;
468 		return -1;
469 	}
470 
471 	assert(group_impl == sock->group_impl);
472 
473 	rc = sock->net_impl->get_placement_id(sock, &placement_id);
474 	if (!rc && (placement_id != 0)) {
475 		spdk_sock_map_release(placement_id);
476 	}
477 
478 	rc = group_impl->net_impl->group_impl_remove_sock(group_impl, sock);
479 	if (rc == 0) {
480 		TAILQ_REMOVE(&group_impl->socks, sock, link);
481 		sock->group_impl = NULL;
482 		sock->cb_fn = NULL;
483 		sock->cb_arg = NULL;
484 	}
485 
486 	return rc;
487 }
488 
489 int
490 spdk_sock_group_poll(struct spdk_sock_group *group)
491 {
492 	return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL);
493 }
494 
495 static int
496 spdk_sock_group_impl_poll_count(struct spdk_sock_group_impl *group_impl,
497 				struct spdk_sock_group *group,
498 				int max_events)
499 {
500 	struct spdk_sock *socks[MAX_EVENTS_PER_POLL];
501 	int num_events, i;
502 
503 	if (TAILQ_EMPTY(&group_impl->socks)) {
504 		return 0;
505 	}
506 
507 	num_events = group_impl->net_impl->group_impl_poll(group_impl, max_events, socks);
508 	if (num_events == -1) {
509 		return -1;
510 	}
511 
512 	for (i = 0; i < num_events; i++) {
513 		struct spdk_sock *sock = socks[i];
514 
515 		assert(sock->cb_fn != NULL);
516 		sock->cb_fn(sock->cb_arg, group, sock);
517 	}
518 	return num_events;
519 }
520 
521 int
522 spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events)
523 {
524 	struct spdk_sock_group_impl *group_impl = NULL;
525 	int rc, num_events = 0;
526 
527 	if (max_events < 1) {
528 		errno = -EINVAL;
529 		return -1;
530 	}
531 
532 	/*
533 	 * Only poll for up to 32 events at a time - if more events are pending,
534 	 *  the next call to this function will reap them.
535 	 */
536 	if (max_events > MAX_EVENTS_PER_POLL) {
537 		max_events = MAX_EVENTS_PER_POLL;
538 	}
539 
540 	STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) {
541 		rc = spdk_sock_group_impl_poll_count(group_impl, group, max_events);
542 		if (rc < 0) {
543 			num_events = -1;
544 			SPDK_ERRLOG("group_impl_poll_count for net(%s) failed\n",
545 				    group_impl->net_impl->name);
546 		} else if (num_events >= 0) {
547 			num_events += rc;
548 		}
549 	}
550 
551 	return num_events;
552 }
553 
554 int
555 spdk_sock_group_close(struct spdk_sock_group **group)
556 {
557 	struct spdk_sock_group_impl *group_impl = NULL, *tmp;
558 	int rc;
559 
560 	if (*group == NULL) {
561 		errno = EBADF;
562 		return -1;
563 	}
564 
565 	STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) {
566 		if (!TAILQ_EMPTY(&group_impl->socks)) {
567 			errno = EBUSY;
568 			return -1;
569 		}
570 	}
571 
572 	STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) {
573 		rc = group_impl->net_impl->group_impl_close(group_impl);
574 		if (rc != 0) {
575 			SPDK_ERRLOG("group_impl_close for net(%s) failed\n",
576 				    group_impl->net_impl->name);
577 		}
578 	}
579 
580 	spdk_sock_remove_sock_group_from_map_table(*group);
581 	free(*group);
582 	*group = NULL;
583 
584 	return 0;
585 }
586 
587 void
588 spdk_net_impl_register(struct spdk_net_impl *impl)
589 {
590 	if (!strcmp("posix", impl->name)) {
591 		STAILQ_INSERT_TAIL(&g_net_impls, impl, link);
592 	} else {
593 		STAILQ_INSERT_HEAD(&g_net_impls, impl, link);
594 	}
595 }
596