xref: /spdk/include/spdk_internal/sock.h (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4  */
5 
6 /** \file
7  * TCP network implementation abstraction layer
8  */
9 
10 #ifndef SPDK_INTERNAL_SOCK_H
11 #define SPDK_INTERNAL_SOCK_H
12 
13 #include "spdk/stdinc.h"
14 #include "spdk/sock.h"
15 #include "spdk/queue.h"
16 #include "spdk/likely.h"
17 #include "spdk/log.h"
18 
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22 
23 #define MAX_EVENTS_PER_POLL 32
24 #define DEFAULT_SOCK_PRIORITY 0
25 #define MIN_SOCK_PIPE_SIZE 1024
26 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024)
27 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024)
28 #define MIN_SO_RCVBUF_SIZE (4 * 1024)
29 #define MIN_SO_SNDBUF_SIZE (4 * 1024)
30 #define IOV_BATCH_SIZE 64
31 
32 struct spdk_sock {
33 	struct spdk_net_impl		*net_impl;
34 	struct spdk_sock_opts		opts;
35 	struct spdk_sock_group_impl	*group_impl;
36 	TAILQ_ENTRY(spdk_sock)		link;
37 
38 	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
39 	TAILQ_HEAD(, spdk_sock_request)	pending_reqs;
40 	struct spdk_sock_request	*read_req;
41 	int				queued_iovcnt;
42 	int				cb_cnt;
43 	spdk_sock_cb			cb_fn;
44 	void				*cb_arg;
45 	struct {
46 		uint8_t		closed		: 1;
47 		uint8_t		reserved	: 7;
48 	} flags;
49 	struct spdk_sock_impl_opts	impl_opts;
50 };
51 
52 struct spdk_sock_group_provided_buf {
53 	size_t						len;
54 	void						*ctx;
55 	STAILQ_ENTRY(spdk_sock_group_provided_buf)	link;
56 };
57 
58 struct spdk_sock_group {
59 	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
60 	STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
61 	void					*ctx;
62 };
63 
64 struct spdk_sock_group_impl {
65 	struct spdk_net_impl			*net_impl;
66 	struct spdk_sock_group			*group;
67 	TAILQ_HEAD(, spdk_sock)			socks;
68 	STAILQ_ENTRY(spdk_sock_group_impl)	link;
69 };
70 
71 struct spdk_sock_map {
72 	STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
73 	pthread_mutex_t mtx;
74 };
75 
76 struct spdk_net_impl {
77 	const char *name;
78 	int priority;
79 
80 	int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
81 		       int clen, uint16_t *cport);
82 	struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
83 	struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
84 	struct spdk_sock *(*accept)(struct spdk_sock *sock);
85 	int (*close)(struct spdk_sock *sock);
86 	ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
87 	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
88 	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
89 
90 	int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
91 	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
92 	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
93 	int (*flush)(struct spdk_sock *sock);
94 
95 	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
96 	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
97 	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
98 
99 	bool (*is_ipv6)(struct spdk_sock *sock);
100 	bool (*is_ipv4)(struct spdk_sock *sock);
101 	bool (*is_connected)(struct spdk_sock *sock);
102 
103 	struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
104 			struct spdk_sock_group_impl *hint);
105 	struct spdk_sock_group_impl *(*group_impl_create)(void);
106 	int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
107 	int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
108 	int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
109 			       struct spdk_sock **socks);
110 	int (*group_impl_close)(struct spdk_sock_group_impl *group);
111 
112 	int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
113 	int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
114 
115 	STAILQ_ENTRY(spdk_net_impl) link;
116 };
117 
118 void spdk_net_impl_register(struct spdk_net_impl *impl, int priority);
119 
120 #define SPDK_NET_IMPL_REGISTER(name, impl, priority) \
121 static void __attribute__((constructor)) net_impl_register_##name(void) \
122 { \
123 	spdk_net_impl_register(impl, priority); \
124 }
125 
126 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);
127 
128 static inline void
129 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
130 {
131 	assert(req->internal.curr_list == NULL);
132 	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
133 #ifdef DEBUG
134 	req->internal.curr_list = &sock->queued_reqs;
135 #endif
136 	sock->queued_iovcnt += req->iovcnt;
137 }
138 
139 static inline void
140 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
141 {
142 	assert(req->internal.curr_list == &sock->queued_reqs);
143 	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
144 	assert(sock->queued_iovcnt >= req->iovcnt);
145 	sock->queued_iovcnt -= req->iovcnt;
146 	TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
147 #ifdef DEBUG
148 	req->internal.curr_list = &sock->pending_reqs;
149 #endif
150 }
151 
152 static inline int
153 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
154 {
155 	bool closed;
156 	int rc = 0;
157 
158 	req->internal.offset = 0;
159 	req->internal.is_zcopy = 0;
160 
161 	closed = sock->flags.closed;
162 	sock->cb_cnt++;
163 	req->cb_fn(req->cb_arg, err);
164 	assert(sock->cb_cnt > 0);
165 	sock->cb_cnt--;
166 
167 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
168 		/* The user closed the socket in response to a callback above. */
169 		rc = -1;
170 		spdk_sock_close(&sock);
171 	}
172 
173 	return rc;
174 }
175 
176 static inline int
177 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
178 {
179 	assert(req->internal.curr_list == &sock->pending_reqs);
180 	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
181 #ifdef DEBUG
182 	req->internal.curr_list = NULL;
183 #endif
184 	return spdk_sock_request_complete(sock, req, err);
185 }
186 
187 static inline int
188 spdk_sock_abort_requests(struct spdk_sock *sock)
189 {
190 	struct spdk_sock_request *req;
191 	bool closed;
192 	int rc = 0;
193 
194 	closed = sock->flags.closed;
195 	sock->cb_cnt++;
196 
197 	req = TAILQ_FIRST(&sock->pending_reqs);
198 	while (req) {
199 		assert(req->internal.curr_list == &sock->pending_reqs);
200 		TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
201 #ifdef DEBUG
202 		req->internal.curr_list = NULL;
203 #endif
204 
205 		req->cb_fn(req->cb_arg, -ECANCELED);
206 
207 		req = TAILQ_FIRST(&sock->pending_reqs);
208 	}
209 
210 	req = TAILQ_FIRST(&sock->queued_reqs);
211 	while (req) {
212 		assert(req->internal.curr_list == &sock->queued_reqs);
213 		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
214 #ifdef DEBUG
215 		req->internal.curr_list = NULL;
216 #endif
217 
218 		assert(sock->queued_iovcnt >= req->iovcnt);
219 		sock->queued_iovcnt -= req->iovcnt;
220 
221 		req->cb_fn(req->cb_arg, -ECANCELED);
222 
223 		req = TAILQ_FIRST(&sock->queued_reqs);
224 	}
225 
226 	req = sock->read_req;
227 	if (req != NULL) {
228 		sock->read_req = NULL;
229 		req->cb_fn(req->cb_arg, -ECANCELED);
230 	}
231 	assert(sock->cb_cnt > 0);
232 	sock->cb_cnt--;
233 
234 	assert(TAILQ_EMPTY(&sock->queued_reqs));
235 	assert(TAILQ_EMPTY(&sock->pending_reqs));
236 
237 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
238 		/* The user closed the socket in response to a callback above. */
239 		rc = -1;
240 		spdk_sock_close(&sock);
241 	}
242 
243 	return rc;
244 }
245 
246 static inline int
247 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
248 		   uint64_t *num_bytes)
249 {
250 	unsigned int offset;
251 	int iovcnt, i;
252 
253 	assert(index < IOV_BATCH_SIZE);
254 	offset = req->internal.offset;
255 	iovcnt = index;
256 
257 	for (i = 0; i < req->iovcnt; i++) {
258 		/* Consume any offset first */
259 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
260 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
261 			continue;
262 		}
263 
264 		iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
265 		iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
266 		if (num_bytes != NULL) {
267 			*num_bytes += iovs[iovcnt].iov_len;
268 		}
269 
270 		iovcnt++;
271 		offset = 0;
272 
273 		if (iovcnt >= IOV_BATCH_SIZE) {
274 			break;
275 		}
276 	}
277 
278 	return iovcnt;
279 }
280 
281 static inline int
282 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
283 		    struct spdk_sock_request **last_req, int *flags)
284 {
285 	int iovcnt;
286 	struct spdk_sock_request *req;
287 	uint64_t total = 0;
288 
289 	/* Gather an iov */
290 	iovcnt = index;
291 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
292 		goto end;
293 	}
294 
295 	if (last_req != NULL && *last_req != NULL) {
296 		req = TAILQ_NEXT(*last_req, internal.link);
297 	} else {
298 		req = TAILQ_FIRST(&_sock->queued_reqs);
299 	}
300 
301 	while (req) {
302 		iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
303 		if (iovcnt >= IOV_BATCH_SIZE) {
304 			break;
305 		}
306 
307 		if (last_req != NULL) {
308 			*last_req = req;
309 		}
310 		req = TAILQ_NEXT(req, internal.link);
311 	}
312 
313 end:
314 
315 #if defined(MSG_ZEROCOPY)
316 	/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
317 	if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
318 		*flags = *flags & (~MSG_ZEROCOPY);
319 	}
320 #endif
321 
322 	return iovcnt;
323 }
324 
325 static inline void
326 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
327 {
328 	*placement_id = -1;
329 
330 	switch (mode) {
331 	case PLACEMENT_NONE:
332 		break;
333 	case PLACEMENT_MARK:
334 	case PLACEMENT_NAPI: {
335 #if defined(SO_INCOMING_NAPI_ID)
336 		socklen_t len = sizeof(int);
337 
338 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
339 		if (rc == -1) {
340 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
341 			assert(false);
342 		}
343 #endif
344 		break;
345 	}
346 	case PLACEMENT_CPU: {
347 #if defined(SO_INCOMING_CPU)
348 		socklen_t len = sizeof(int);
349 
350 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
351 		if (rc == -1) {
352 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
353 			assert(false);
354 		}
355 #endif
356 		break;
357 	}
358 	default:
359 		break;
360 	}
361 }
362 
363 /**
364  * Insert a group into the placement map.
365  * If the group is already in the map, take a reference.
366  */
367 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
368 			 struct spdk_sock_group_impl *group_impl);
369 
370 /**
371  * Release a reference for the given placement_id. If the reference count goes to 0, the
372  * entry will no longer be associated with a group.
373  */
374 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
375 
376 /**
377  * Look up the group for the given placement_id.
378  */
379 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
380 			 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
381 
382 /**
383  * Find a placement id with no associated group
384  */
385 int spdk_sock_map_find_free(struct spdk_sock_map *map);
386 
387 /**
388  * Clean up all memory associated with the given map
389  */
390 void spdk_sock_map_cleanup(struct spdk_sock_map *map);
391 
392 #ifdef __cplusplus
393 }
394 #endif
395 
396 #endif /* SPDK_INTERNAL_SOCK_H */
397