xref: /spdk/include/spdk_internal/sock.h (revision 2d610abe8d35ec40014b8a449b760cf732797c75)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4  */
5 
6 /** \file
7  * TCP network implementation abstraction layer
8  */
9 
10 #ifndef SPDK_INTERNAL_SOCK_H
11 #define SPDK_INTERNAL_SOCK_H
12 
13 #include "spdk/stdinc.h"
14 #include "spdk/sock.h"
15 #include "spdk/queue.h"
16 #include "spdk/likely.h"
17 #include "spdk/log.h"
18 #include "spdk/trace.h"
19 #include "spdk_internal/trace_defs.h"
20 
21 #ifdef __cplusplus
22 extern "C" {
23 #endif
24 
25 #define MAX_EVENTS_PER_POLL 32
26 #define DEFAULT_SOCK_PRIORITY 0
27 #define MIN_SOCK_PIPE_SIZE 1024
28 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024)
29 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024)
30 #define MIN_SO_RCVBUF_SIZE (4 * 1024)
31 #define MIN_SO_SNDBUF_SIZE (4 * 1024)
32 #define IOV_BATCH_SIZE 64
33 
34 struct spdk_sock {
35 	struct spdk_net_impl		*net_impl;
36 	struct spdk_sock_opts		opts;
37 	struct spdk_sock_group_impl	*group_impl;
38 	TAILQ_ENTRY(spdk_sock)		link;
39 
40 	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
41 	TAILQ_HEAD(, spdk_sock_request)	pending_reqs;
42 	struct spdk_sock_request	*read_req;
43 	int				queued_iovcnt;
44 	int				cb_cnt;
45 	spdk_sock_cb			cb_fn;
46 	void				*cb_arg;
47 	struct {
48 		uint8_t		closed		: 1;
49 		uint8_t		reserved	: 7;
50 	} flags;
51 	struct spdk_sock_impl_opts	impl_opts;
52 };
53 
54 struct spdk_sock_group_provided_buf {
55 	size_t						len;
56 	void						*ctx;
57 	STAILQ_ENTRY(spdk_sock_group_provided_buf)	link;
58 };
59 
60 struct spdk_sock_group {
61 	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
62 	STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
63 	void					*ctx;
64 };
65 
66 struct spdk_sock_group_impl {
67 	struct spdk_net_impl			*net_impl;
68 	struct spdk_sock_group			*group;
69 	TAILQ_HEAD(, spdk_sock)			socks;
70 	STAILQ_ENTRY(spdk_sock_group_impl)	link;
71 };
72 
73 struct spdk_sock_map {
74 	STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
75 	pthread_mutex_t mtx;
76 };
77 
78 struct spdk_net_impl {
79 	const char *name;
80 
81 	int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
82 		       int clen, uint16_t *cport);
83 	struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
84 	struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
85 	struct spdk_sock *(*accept)(struct spdk_sock *sock);
86 	int (*close)(struct spdk_sock *sock);
87 	ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
88 	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
89 	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
90 
91 	int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
92 	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
93 	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
94 	int (*flush)(struct spdk_sock *sock);
95 
96 	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
97 	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
98 	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
99 
100 	bool (*is_ipv6)(struct spdk_sock *sock);
101 	bool (*is_ipv4)(struct spdk_sock *sock);
102 	bool (*is_connected)(struct spdk_sock *sock);
103 
104 	struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
105 			struct spdk_sock_group_impl *hint);
106 	struct spdk_sock_group_impl *(*group_impl_create)(void);
107 	int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
108 	int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
109 	int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
110 			       struct spdk_sock **socks);
111 	int (*group_impl_close)(struct spdk_sock_group_impl *group);
112 
113 	int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
114 	int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
115 
116 	STAILQ_ENTRY(spdk_net_impl) link;
117 };
118 
119 void spdk_net_impl_register(struct spdk_net_impl *impl);
120 
121 #define SPDK_NET_IMPL_REGISTER(name, impl) \
122 static void __attribute__((constructor)) net_impl_register_##name(void) \
123 { \
124 	spdk_net_impl_register(impl); \
125 }
126 
127 #define SPDK_NET_IMPL_REGISTER_DEFAULT(name, impl) \
128 static void __attribute__((constructor)) net_impl_register_default_##name(void) \
129 { \
130 	spdk_net_impl_register(impl); \
131 	spdk_sock_set_default_impl(SPDK_STRINGIFY(name)); \
132 }
133 
134 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);
135 
136 static inline void
137 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
138 {
139 	assert(req->internal.curr_list == NULL);
140 	if (spdk_trace_tpoint_enabled(TRACE_SOCK_REQ_QUEUE)) {
141 		uint64_t len = 0;
142 		int i;
143 
144 		for (i = 0; i < req->iovcnt; i++) {
145 			len += SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
146 		}
147 		spdk_trace_record(TRACE_SOCK_REQ_QUEUE, 0, len, (uintptr_t)req, (uintptr_t)req->cb_arg);
148 	}
149 	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
150 #ifdef DEBUG
151 	req->internal.curr_list = &sock->queued_reqs;
152 #endif
153 	sock->queued_iovcnt += req->iovcnt;
154 }
155 
156 static inline void
157 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
158 {
159 	assert(req->internal.curr_list == &sock->queued_reqs);
160 	spdk_trace_record(TRACE_SOCK_REQ_PEND, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
161 	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
162 	assert(sock->queued_iovcnt >= req->iovcnt);
163 	sock->queued_iovcnt -= req->iovcnt;
164 	TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
165 #ifdef DEBUG
166 	req->internal.curr_list = &sock->pending_reqs;
167 #endif
168 }
169 
170 static inline int
171 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
172 {
173 	bool closed;
174 	int rc = 0;
175 
176 	spdk_trace_record(TRACE_SOCK_REQ_COMPLETE, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
177 	req->internal.offset = 0;
178 	req->internal.is_zcopy = 0;
179 
180 	closed = sock->flags.closed;
181 	sock->cb_cnt++;
182 	req->cb_fn(req->cb_arg, err);
183 	assert(sock->cb_cnt > 0);
184 	sock->cb_cnt--;
185 
186 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
187 		/* The user closed the socket in response to a callback above. */
188 		rc = -1;
189 		spdk_sock_close(&sock);
190 	}
191 
192 	return rc;
193 }
194 
195 static inline int
196 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
197 {
198 	assert(req->internal.curr_list == &sock->pending_reqs);
199 	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
200 #ifdef DEBUG
201 	req->internal.curr_list = NULL;
202 #endif
203 	return spdk_sock_request_complete(sock, req, err);
204 }
205 
206 static inline int
207 spdk_sock_abort_requests(struct spdk_sock *sock)
208 {
209 	struct spdk_sock_request *req;
210 	bool closed;
211 	int rc = 0;
212 
213 	closed = sock->flags.closed;
214 	sock->cb_cnt++;
215 
216 	req = TAILQ_FIRST(&sock->pending_reqs);
217 	while (req) {
218 		assert(req->internal.curr_list == &sock->pending_reqs);
219 		TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
220 #ifdef DEBUG
221 		req->internal.curr_list = NULL;
222 #endif
223 
224 		req->cb_fn(req->cb_arg, -ECANCELED);
225 
226 		req = TAILQ_FIRST(&sock->pending_reqs);
227 	}
228 
229 	req = TAILQ_FIRST(&sock->queued_reqs);
230 	while (req) {
231 		assert(req->internal.curr_list == &sock->queued_reqs);
232 		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
233 #ifdef DEBUG
234 		req->internal.curr_list = NULL;
235 #endif
236 
237 		assert(sock->queued_iovcnt >= req->iovcnt);
238 		sock->queued_iovcnt -= req->iovcnt;
239 
240 		req->cb_fn(req->cb_arg, -ECANCELED);
241 
242 		req = TAILQ_FIRST(&sock->queued_reqs);
243 	}
244 
245 	req = sock->read_req;
246 	if (req != NULL) {
247 		sock->read_req = NULL;
248 		req->cb_fn(req->cb_arg, -ECANCELED);
249 	}
250 	assert(sock->cb_cnt > 0);
251 	sock->cb_cnt--;
252 
253 	assert(TAILQ_EMPTY(&sock->queued_reqs));
254 	assert(TAILQ_EMPTY(&sock->pending_reqs));
255 
256 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
257 		/* The user closed the socket in response to a callback above. */
258 		rc = -1;
259 		spdk_sock_close(&sock);
260 	}
261 
262 	return rc;
263 }
264 
265 static inline int
266 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
267 		   uint64_t *num_bytes)
268 {
269 	unsigned int offset;
270 	int iovcnt, i;
271 
272 	assert(index < IOV_BATCH_SIZE);
273 	offset = req->internal.offset;
274 	iovcnt = index;
275 
276 	for (i = 0; i < req->iovcnt; i++) {
277 		/* Consume any offset first */
278 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
279 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
280 			continue;
281 		}
282 
283 		iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
284 		iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
285 		if (num_bytes != NULL) {
286 			*num_bytes += iovs[iovcnt].iov_len;
287 		}
288 
289 		iovcnt++;
290 		offset = 0;
291 
292 		if (iovcnt >= IOV_BATCH_SIZE) {
293 			break;
294 		}
295 	}
296 
297 	return iovcnt;
298 }
299 
300 static inline int
301 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
302 		    struct spdk_sock_request **last_req, int *flags)
303 {
304 	int iovcnt;
305 	struct spdk_sock_request *req;
306 	uint64_t total = 0;
307 
308 	/* Gather an iov */
309 	iovcnt = index;
310 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
311 		goto end;
312 	}
313 
314 	if (last_req != NULL && *last_req != NULL) {
315 		req = TAILQ_NEXT(*last_req, internal.link);
316 	} else {
317 		req = TAILQ_FIRST(&_sock->queued_reqs);
318 	}
319 
320 	while (req) {
321 		iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
322 		if (iovcnt >= IOV_BATCH_SIZE) {
323 			break;
324 		}
325 
326 		if (last_req != NULL) {
327 			*last_req = req;
328 		}
329 		req = TAILQ_NEXT(req, internal.link);
330 	}
331 
332 end:
333 
334 #if defined(MSG_ZEROCOPY)
335 	/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
336 	if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
337 		*flags = *flags & (~MSG_ZEROCOPY);
338 	}
339 #endif
340 
341 	return iovcnt;
342 }
343 
344 static inline void
345 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
346 {
347 	*placement_id = -1;
348 
349 	switch (mode) {
350 	case PLACEMENT_NONE:
351 		break;
352 	case PLACEMENT_MARK:
353 	case PLACEMENT_NAPI: {
354 #if defined(SO_INCOMING_NAPI_ID)
355 		socklen_t len = sizeof(int);
356 
357 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
358 		if (rc == -1) {
359 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
360 			assert(false);
361 		}
362 #endif
363 		break;
364 	}
365 	case PLACEMENT_CPU: {
366 #if defined(SO_INCOMING_CPU)
367 		socklen_t len = sizeof(int);
368 
369 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
370 		if (rc == -1) {
371 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
372 			assert(false);
373 		}
374 #endif
375 		break;
376 	}
377 	default:
378 		break;
379 	}
380 }
381 
382 /**
383  * Insert a group into the placement map.
384  * If the group is already in the map, take a reference.
385  */
386 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
387 			 struct spdk_sock_group_impl *group_impl);
388 
389 /**
390  * Release a reference for the given placement_id. If the reference count goes to 0, the
391  * entry will no longer be associated with a group.
392  */
393 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
394 
395 /**
396  * Look up the group for the given placement_id.
397  */
398 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
399 			 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
400 
401 /**
402  * Find a placement id with no associated group
403  */
404 int spdk_sock_map_find_free(struct spdk_sock_map *map);
405 
406 /**
407  * Clean up all memory associated with the given map
408  */
409 void spdk_sock_map_cleanup(struct spdk_sock_map *map);
410 
411 #ifdef __cplusplus
412 }
413 #endif
414 
415 #endif /* SPDK_INTERNAL_SOCK_H */
416