xref: /spdk/include/spdk_internal/sock.h (revision cdb0726b95631d46eaf4f2e39ddb6533f150fd27)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4  */
5 
6 /** \file
7  * TCP network implementation abstraction layer
8  */
9 
10 #ifndef SPDK_INTERNAL_SOCK_H
11 #define SPDK_INTERNAL_SOCK_H
12 
13 #include "spdk/stdinc.h"
14 #include "spdk/sock.h"
15 #include "spdk/queue.h"
16 #include "spdk/likely.h"
17 
18 #ifdef __cplusplus
19 extern "C" {
20 #endif
21 
22 #define MAX_EVENTS_PER_POLL 32
23 #define DEFAULT_SOCK_PRIORITY 0
24 #define MIN_SOCK_PIPE_SIZE 1024
25 #define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024)
26 #define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024)
27 #define IOV_BATCH_SIZE 64
28 
29 struct spdk_sock {
30 	struct spdk_net_impl		*net_impl;
31 	struct spdk_sock_opts		opts;
32 	struct spdk_sock_group_impl	*group_impl;
33 	TAILQ_ENTRY(spdk_sock)		link;
34 
35 	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
36 	TAILQ_HEAD(, spdk_sock_request)	pending_reqs;
37 	struct spdk_sock_request	*read_req;
38 	int				queued_iovcnt;
39 	int				cb_cnt;
40 	spdk_sock_cb			cb_fn;
41 	void				*cb_arg;
42 	struct {
43 		uint8_t		closed		: 1;
44 		uint8_t		reserved	: 7;
45 	} flags;
46 	struct spdk_sock_impl_opts	impl_opts;
47 };
48 
49 struct spdk_sock_group {
50 	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
51 	void					*ctx;
52 };
53 
54 struct spdk_sock_group_impl {
55 	struct spdk_net_impl			*net_impl;
56 	struct spdk_sock_group			*group;
57 	TAILQ_HEAD(, spdk_sock)			socks;
58 	STAILQ_ENTRY(spdk_sock_group_impl)	link;
59 };
60 
61 struct spdk_sock_map {
62 	STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
63 	pthread_mutex_t mtx;
64 };
65 
66 struct spdk_net_impl {
67 	const char *name;
68 	int priority;
69 
70 	int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
71 		       int clen, uint16_t *cport);
72 	struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
73 	struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
74 	struct spdk_sock *(*accept)(struct spdk_sock *sock);
75 	int (*close)(struct spdk_sock *sock);
76 	ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
77 	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
78 	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
79 
80 	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
81 	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
82 	int (*flush)(struct spdk_sock *sock);
83 
84 	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
85 	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
86 	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
87 
88 	bool (*is_ipv6)(struct spdk_sock *sock);
89 	bool (*is_ipv4)(struct spdk_sock *sock);
90 	bool (*is_connected)(struct spdk_sock *sock);
91 
92 	struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
93 			struct spdk_sock_group_impl *hint);
94 	struct spdk_sock_group_impl *(*group_impl_create)(void);
95 	int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
96 	int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
97 	int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
98 			       struct spdk_sock **socks);
99 	int (*group_impl_close)(struct spdk_sock_group_impl *group);
100 
101 	int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
102 	int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
103 
104 	STAILQ_ENTRY(spdk_net_impl) link;
105 };
106 
107 void spdk_net_impl_register(struct spdk_net_impl *impl, int priority);
108 
109 #define SPDK_NET_IMPL_REGISTER(name, impl, priority) \
110 static void __attribute__((constructor)) net_impl_register_##name(void) \
111 { \
112 	spdk_net_impl_register(impl, priority); \
113 }
114 
115 static inline void
116 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
117 {
118 	assert(req->internal.curr_list == NULL);
119 	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
120 #ifdef DEBUG
121 	req->internal.curr_list = &sock->queued_reqs;
122 #endif
123 	sock->queued_iovcnt += req->iovcnt;
124 }
125 
126 static inline void
127 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
128 {
129 	assert(req->internal.curr_list == &sock->queued_reqs);
130 	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
131 	assert(sock->queued_iovcnt >= req->iovcnt);
132 	sock->queued_iovcnt -= req->iovcnt;
133 	TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
134 #ifdef DEBUG
135 	req->internal.curr_list = &sock->pending_reqs;
136 #endif
137 }
138 
139 static inline int
140 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
141 {
142 	bool closed;
143 	int rc = 0;
144 
145 	req->internal.offset = 0;
146 	req->internal.is_zcopy = 0;
147 
148 	closed = sock->flags.closed;
149 	sock->cb_cnt++;
150 	req->cb_fn(req->cb_arg, err);
151 	assert(sock->cb_cnt > 0);
152 	sock->cb_cnt--;
153 
154 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
155 		/* The user closed the socket in response to a callback above. */
156 		rc = -1;
157 		spdk_sock_close(&sock);
158 	}
159 
160 	return rc;
161 }
162 
163 static inline int
164 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
165 {
166 	assert(req->internal.curr_list == &sock->pending_reqs);
167 	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
168 #ifdef DEBUG
169 	req->internal.curr_list = NULL;
170 #endif
171 	return spdk_sock_request_complete(sock, req, err);
172 }
173 
174 static inline int
175 spdk_sock_abort_requests(struct spdk_sock *sock)
176 {
177 	struct spdk_sock_request *req;
178 	bool closed;
179 	int rc = 0;
180 
181 	closed = sock->flags.closed;
182 	sock->cb_cnt++;
183 
184 	req = TAILQ_FIRST(&sock->pending_reqs);
185 	while (req) {
186 		assert(req->internal.curr_list == &sock->pending_reqs);
187 		TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
188 #ifdef DEBUG
189 		req->internal.curr_list = NULL;
190 #endif
191 
192 		req->cb_fn(req->cb_arg, -ECANCELED);
193 
194 		req = TAILQ_FIRST(&sock->pending_reqs);
195 	}
196 
197 	req = TAILQ_FIRST(&sock->queued_reqs);
198 	while (req) {
199 		assert(req->internal.curr_list == &sock->queued_reqs);
200 		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
201 #ifdef DEBUG
202 		req->internal.curr_list = NULL;
203 #endif
204 
205 		assert(sock->queued_iovcnt >= req->iovcnt);
206 		sock->queued_iovcnt -= req->iovcnt;
207 
208 		req->cb_fn(req->cb_arg, -ECANCELED);
209 
210 		req = TAILQ_FIRST(&sock->queued_reqs);
211 	}
212 
213 	req = sock->read_req;
214 	if (req != NULL) {
215 		sock->read_req = NULL;
216 		req->cb_fn(req->cb_arg, -ECANCELED);
217 	}
218 	assert(sock->cb_cnt > 0);
219 	sock->cb_cnt--;
220 
221 	assert(TAILQ_EMPTY(&sock->queued_reqs));
222 	assert(TAILQ_EMPTY(&sock->pending_reqs));
223 
224 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
225 		/* The user closed the socket in response to a callback above. */
226 		rc = -1;
227 		spdk_sock_close(&sock);
228 	}
229 
230 	return rc;
231 }
232 
233 static inline int
234 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
235 		   uint64_t *num_bytes)
236 {
237 	unsigned int offset;
238 	int iovcnt, i;
239 
240 	assert(index < IOV_BATCH_SIZE);
241 	offset = req->internal.offset;
242 	iovcnt = index;
243 
244 	for (i = 0; i < req->iovcnt; i++) {
245 		/* Consume any offset first */
246 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
247 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
248 			continue;
249 		}
250 
251 		iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
252 		iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
253 		if (num_bytes != NULL) {
254 			*num_bytes += iovs[iovcnt].iov_len;
255 		}
256 
257 		iovcnt++;
258 		offset = 0;
259 
260 		if (iovcnt >= IOV_BATCH_SIZE) {
261 			break;
262 		}
263 	}
264 
265 	return iovcnt;
266 }
267 
268 static inline int
269 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
270 		    struct spdk_sock_request **last_req, int *flags)
271 {
272 	int iovcnt;
273 	struct spdk_sock_request *req;
274 	uint64_t total = 0;
275 
276 	/* Gather an iov */
277 	iovcnt = index;
278 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
279 		goto end;
280 	}
281 
282 	if (last_req != NULL && *last_req != NULL) {
283 		req = TAILQ_NEXT(*last_req, internal.link);
284 	} else {
285 		req = TAILQ_FIRST(&_sock->queued_reqs);
286 	}
287 
288 	while (req) {
289 		iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
290 		if (iovcnt >= IOV_BATCH_SIZE) {
291 			break;
292 		}
293 
294 		if (last_req != NULL) {
295 			*last_req = req;
296 		}
297 		req = TAILQ_NEXT(req, internal.link);
298 	}
299 
300 end:
301 
302 #if defined(MSG_ZEROCOPY)
303 	/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
304 	if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
305 		*flags = *flags & (~MSG_ZEROCOPY);
306 	}
307 #endif
308 
309 	return iovcnt;
310 }
311 
312 static inline void
313 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
314 {
315 	*placement_id = -1;
316 
317 	switch (mode) {
318 	case PLACEMENT_NONE:
319 		break;
320 	case PLACEMENT_MARK:
321 	case PLACEMENT_NAPI: {
322 #if defined(SO_INCOMING_NAPI_ID)
323 		socklen_t len = sizeof(int);
324 
325 		getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
326 #endif
327 		break;
328 	}
329 	case PLACEMENT_CPU: {
330 #if defined(SO_INCOMING_CPU)
331 		socklen_t len = sizeof(int);
332 
333 		getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
334 #endif
335 		break;
336 	}
337 	default:
338 		break;
339 	}
340 }
341 
342 /**
343  * Insert a group into the placement map.
344  * If the group is already in the map, take a reference.
345  */
346 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
347 			 struct spdk_sock_group_impl *group_impl);
348 
349 /**
350  * Release a reference for the given placement_id. If the reference count goes to 0, the
351  * entry will no longer be associated with a group.
352  */
353 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
354 
355 /**
356  * Look up the group for the given placement_id.
357  */
358 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
359 			 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
360 
361 /**
362  * Find a placement id with no associated group
363  */
364 int spdk_sock_map_find_free(struct spdk_sock_map *map);
365 
366 /**
367  * Clean up all memory associated with the given map
368  */
369 void spdk_sock_map_cleanup(struct spdk_sock_map *map);
370 
371 #ifdef __cplusplus
372 }
373 #endif
374 
375 #endif /* SPDK_INTERNAL_SOCK_H */
376