xref: /spdk/include/spdk_internal/sock.h (revision b3bec07939ebe2ea2e0c43931705d32aa9e06719)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4  */
5 
6 /** \file
7  * TCP network implementation abstraction layer
8  */
9 
10 #ifndef SPDK_INTERNAL_SOCK_H
11 #define SPDK_INTERNAL_SOCK_H
12 
13 #include "spdk/stdinc.h"
14 #include "spdk/sock.h"
15 #include "spdk/queue.h"
16 #include "spdk/likely.h"
17 
18 #ifdef __cplusplus
19 extern "C" {
20 #endif
21 
22 #define MAX_EVENTS_PER_POLL 32
23 #define DEFAULT_SOCK_PRIORITY 0
24 #define MIN_SOCK_PIPE_SIZE 1024
25 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024)
26 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024)
27 #define MIN_SO_RCVBUF_SIZE (4 * 1024)
28 #define MIN_SO_SNDBUF_SIZE (4 * 1024)
29 #define IOV_BATCH_SIZE 64
30 
31 struct spdk_sock {
32 	struct spdk_net_impl		*net_impl;
33 	struct spdk_sock_opts		opts;
34 	struct spdk_sock_group_impl	*group_impl;
35 	TAILQ_ENTRY(spdk_sock)		link;
36 
37 	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
38 	TAILQ_HEAD(, spdk_sock_request)	pending_reqs;
39 	struct spdk_sock_request	*read_req;
40 	int				queued_iovcnt;
41 	int				cb_cnt;
42 	spdk_sock_cb			cb_fn;
43 	void				*cb_arg;
44 	struct {
45 		uint8_t		closed		: 1;
46 		uint8_t		reserved	: 7;
47 	} flags;
48 	struct spdk_sock_impl_opts	impl_opts;
49 };
50 
51 struct spdk_sock_group_provided_buf {
52 	size_t						len;
53 	void						*ctx;
54 	STAILQ_ENTRY(spdk_sock_group_provided_buf)	link;
55 };
56 
57 struct spdk_sock_group {
58 	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
59 	STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
60 	void					*ctx;
61 };
62 
63 struct spdk_sock_group_impl {
64 	struct spdk_net_impl			*net_impl;
65 	struct spdk_sock_group			*group;
66 	TAILQ_HEAD(, spdk_sock)			socks;
67 	STAILQ_ENTRY(spdk_sock_group_impl)	link;
68 };
69 
70 struct spdk_sock_map {
71 	STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
72 	pthread_mutex_t mtx;
73 };
74 
75 struct spdk_net_impl {
76 	const char *name;
77 	int priority;
78 
79 	int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
80 		       int clen, uint16_t *cport);
81 	struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
82 	struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
83 	struct spdk_sock *(*accept)(struct spdk_sock *sock);
84 	int (*close)(struct spdk_sock *sock);
85 	ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
86 	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
87 	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
88 
89 	int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
90 	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
91 	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
92 	int (*flush)(struct spdk_sock *sock);
93 
94 	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
95 	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
96 	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
97 
98 	bool (*is_ipv6)(struct spdk_sock *sock);
99 	bool (*is_ipv4)(struct spdk_sock *sock);
100 	bool (*is_connected)(struct spdk_sock *sock);
101 
102 	struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
103 			struct spdk_sock_group_impl *hint);
104 	struct spdk_sock_group_impl *(*group_impl_create)(void);
105 	int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
106 	int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
107 	int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
108 			       struct spdk_sock **socks);
109 	int (*group_impl_close)(struct spdk_sock_group_impl *group);
110 
111 	int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
112 	int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
113 
114 	STAILQ_ENTRY(spdk_net_impl) link;
115 };
116 
117 void spdk_net_impl_register(struct spdk_net_impl *impl, int priority);
118 
119 #define SPDK_NET_IMPL_REGISTER(name, impl, priority) \
120 static void __attribute__((constructor)) net_impl_register_##name(void) \
121 { \
122 	spdk_net_impl_register(impl, priority); \
123 }
124 
125 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);
126 
127 static inline void
128 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
129 {
130 	assert(req->internal.curr_list == NULL);
131 	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
132 #ifdef DEBUG
133 	req->internal.curr_list = &sock->queued_reqs;
134 #endif
135 	sock->queued_iovcnt += req->iovcnt;
136 }
137 
138 static inline void
139 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
140 {
141 	assert(req->internal.curr_list == &sock->queued_reqs);
142 	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
143 	assert(sock->queued_iovcnt >= req->iovcnt);
144 	sock->queued_iovcnt -= req->iovcnt;
145 	TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
146 #ifdef DEBUG
147 	req->internal.curr_list = &sock->pending_reqs;
148 #endif
149 }
150 
151 static inline int
152 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
153 {
154 	bool closed;
155 	int rc = 0;
156 
157 	req->internal.offset = 0;
158 	req->internal.is_zcopy = 0;
159 
160 	closed = sock->flags.closed;
161 	sock->cb_cnt++;
162 	req->cb_fn(req->cb_arg, err);
163 	assert(sock->cb_cnt > 0);
164 	sock->cb_cnt--;
165 
166 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
167 		/* The user closed the socket in response to a callback above. */
168 		rc = -1;
169 		spdk_sock_close(&sock);
170 	}
171 
172 	return rc;
173 }
174 
175 static inline int
176 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
177 {
178 	assert(req->internal.curr_list == &sock->pending_reqs);
179 	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
180 #ifdef DEBUG
181 	req->internal.curr_list = NULL;
182 #endif
183 	return spdk_sock_request_complete(sock, req, err);
184 }
185 
186 static inline int
187 spdk_sock_abort_requests(struct spdk_sock *sock)
188 {
189 	struct spdk_sock_request *req;
190 	bool closed;
191 	int rc = 0;
192 
193 	closed = sock->flags.closed;
194 	sock->cb_cnt++;
195 
196 	req = TAILQ_FIRST(&sock->pending_reqs);
197 	while (req) {
198 		assert(req->internal.curr_list == &sock->pending_reqs);
199 		TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
200 #ifdef DEBUG
201 		req->internal.curr_list = NULL;
202 #endif
203 
204 		req->cb_fn(req->cb_arg, -ECANCELED);
205 
206 		req = TAILQ_FIRST(&sock->pending_reqs);
207 	}
208 
209 	req = TAILQ_FIRST(&sock->queued_reqs);
210 	while (req) {
211 		assert(req->internal.curr_list == &sock->queued_reqs);
212 		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
213 #ifdef DEBUG
214 		req->internal.curr_list = NULL;
215 #endif
216 
217 		assert(sock->queued_iovcnt >= req->iovcnt);
218 		sock->queued_iovcnt -= req->iovcnt;
219 
220 		req->cb_fn(req->cb_arg, -ECANCELED);
221 
222 		req = TAILQ_FIRST(&sock->queued_reqs);
223 	}
224 
225 	req = sock->read_req;
226 	if (req != NULL) {
227 		sock->read_req = NULL;
228 		req->cb_fn(req->cb_arg, -ECANCELED);
229 	}
230 	assert(sock->cb_cnt > 0);
231 	sock->cb_cnt--;
232 
233 	assert(TAILQ_EMPTY(&sock->queued_reqs));
234 	assert(TAILQ_EMPTY(&sock->pending_reqs));
235 
236 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
237 		/* The user closed the socket in response to a callback above. */
238 		rc = -1;
239 		spdk_sock_close(&sock);
240 	}
241 
242 	return rc;
243 }
244 
245 static inline int
246 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
247 		   uint64_t *num_bytes)
248 {
249 	unsigned int offset;
250 	int iovcnt, i;
251 
252 	assert(index < IOV_BATCH_SIZE);
253 	offset = req->internal.offset;
254 	iovcnt = index;
255 
256 	for (i = 0; i < req->iovcnt; i++) {
257 		/* Consume any offset first */
258 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
259 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
260 			continue;
261 		}
262 
263 		iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
264 		iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
265 		if (num_bytes != NULL) {
266 			*num_bytes += iovs[iovcnt].iov_len;
267 		}
268 
269 		iovcnt++;
270 		offset = 0;
271 
272 		if (iovcnt >= IOV_BATCH_SIZE) {
273 			break;
274 		}
275 	}
276 
277 	return iovcnt;
278 }
279 
280 static inline int
281 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
282 		    struct spdk_sock_request **last_req, int *flags)
283 {
284 	int iovcnt;
285 	struct spdk_sock_request *req;
286 	uint64_t total = 0;
287 
288 	/* Gather an iov */
289 	iovcnt = index;
290 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
291 		goto end;
292 	}
293 
294 	if (last_req != NULL && *last_req != NULL) {
295 		req = TAILQ_NEXT(*last_req, internal.link);
296 	} else {
297 		req = TAILQ_FIRST(&_sock->queued_reqs);
298 	}
299 
300 	while (req) {
301 		iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
302 		if (iovcnt >= IOV_BATCH_SIZE) {
303 			break;
304 		}
305 
306 		if (last_req != NULL) {
307 			*last_req = req;
308 		}
309 		req = TAILQ_NEXT(req, internal.link);
310 	}
311 
312 end:
313 
314 #if defined(MSG_ZEROCOPY)
315 	/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
316 	if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
317 		*flags = *flags & (~MSG_ZEROCOPY);
318 	}
319 #endif
320 
321 	return iovcnt;
322 }
323 
324 static inline void
325 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
326 {
327 	*placement_id = -1;
328 
329 	switch (mode) {
330 	case PLACEMENT_NONE:
331 		break;
332 	case PLACEMENT_MARK:
333 	case PLACEMENT_NAPI: {
334 #if defined(SO_INCOMING_NAPI_ID)
335 		socklen_t len = sizeof(int);
336 
337 		getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
338 #endif
339 		break;
340 	}
341 	case PLACEMENT_CPU: {
342 #if defined(SO_INCOMING_CPU)
343 		socklen_t len = sizeof(int);
344 
345 		getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
346 #endif
347 		break;
348 	}
349 	default:
350 		break;
351 	}
352 }
353 
354 /**
355  * Insert a group into the placement map.
356  * If the group is already in the map, take a reference.
357  */
358 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
359 			 struct spdk_sock_group_impl *group_impl);
360 
361 /**
362  * Release a reference for the given placement_id. If the reference count goes to 0, the
363  * entry will no longer be associated with a group.
364  */
365 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
366 
367 /**
368  * Look up the group for the given placement_id.
369  */
370 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
371 			 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
372 
373 /**
374  * Find a placement id with no associated group
375  */
376 int spdk_sock_map_find_free(struct spdk_sock_map *map);
377 
378 /**
379  * Clean up all memory associated with the given map
380  */
381 void spdk_sock_map_cleanup(struct spdk_sock_map *map);
382 
383 #ifdef __cplusplus
384 }
385 #endif
386 
387 #endif /* SPDK_INTERNAL_SOCK_H */
388