xref: /spdk/include/spdk_internal/sock.h (revision 7ff7ec0ed88d5acad07010b6c577326debc22f7c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4  */
5 
6 /** \file
7  * TCP network implementation abstraction layer
8  */
9 
10 #ifndef SPDK_INTERNAL_SOCK_H
11 #define SPDK_INTERNAL_SOCK_H
12 
13 #include "spdk/stdinc.h"
14 #include "spdk/sock.h"
15 #include "spdk/queue.h"
16 #include "spdk/likely.h"
17 #include "spdk/log.h"
18 #include "spdk/trace.h"
19 #include "spdk_internal/trace_defs.h"
20 
21 #ifdef __cplusplus
22 extern "C" {
23 #endif
24 
25 #define MAX_EVENTS_PER_POLL 32
26 #define DEFAULT_SOCK_PRIORITY 0
27 #define MIN_SOCK_PIPE_SIZE 1024
28 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024)
29 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024)
30 #define MIN_SO_RCVBUF_SIZE (4 * 1024)
31 #define MIN_SO_SNDBUF_SIZE (4 * 1024)
32 #define IOV_BATCH_SIZE 64
33 
34 struct spdk_sock {
35 	struct spdk_net_impl		*net_impl;
36 	struct spdk_sock_opts		opts;
37 	struct spdk_sock_group_impl	*group_impl;
38 	TAILQ_ENTRY(spdk_sock)		link;
39 
40 	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
41 	TAILQ_HEAD(, spdk_sock_request)	pending_reqs;
42 	struct spdk_sock_request	*read_req;
43 	int				queued_iovcnt;
44 	int				cb_cnt;
45 	spdk_sock_cb			cb_fn;
46 	void				*cb_arg;
47 	struct {
48 		uint8_t		closed		: 1;
49 		uint8_t		reserved	: 7;
50 	} flags;
51 	struct spdk_sock_impl_opts	impl_opts;
52 };
53 
54 struct spdk_sock_group_provided_buf {
55 	size_t						len;
56 	void						*ctx;
57 	STAILQ_ENTRY(spdk_sock_group_provided_buf)	link;
58 };
59 
60 struct spdk_sock_group {
61 	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
62 	STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
63 	void					*ctx;
64 };
65 
66 struct spdk_sock_group_impl {
67 	struct spdk_net_impl			*net_impl;
68 	struct spdk_sock_group			*group;
69 	TAILQ_HEAD(, spdk_sock)			socks;
70 	STAILQ_ENTRY(spdk_sock_group_impl)	link;
71 };
72 
73 struct spdk_sock_map {
74 	STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
75 	pthread_mutex_t mtx;
76 };
77 
78 struct spdk_net_impl {
79 	const char *name;
80 
81 	int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
82 		       int clen, uint16_t *cport);
83 	struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
84 	struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
85 	struct spdk_sock *(*accept)(struct spdk_sock *sock);
86 	int (*close)(struct spdk_sock *sock);
87 	ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
88 	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
89 	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
90 
91 	int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
92 	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
93 	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
94 	int (*flush)(struct spdk_sock *sock);
95 
96 	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
97 	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
98 	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
99 
100 	bool (*is_ipv6)(struct spdk_sock *sock);
101 	bool (*is_ipv4)(struct spdk_sock *sock);
102 	bool (*is_connected)(struct spdk_sock *sock);
103 
104 	struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
105 			struct spdk_sock_group_impl *hint);
106 	struct spdk_sock_group_impl *(*group_impl_create)(void);
107 	int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
108 	int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
109 	int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
110 			       struct spdk_sock **socks);
111 	int (*group_impl_register_interrupt)(struct spdk_sock_group_impl *group, uint32_t events,
112 					     spdk_interrupt_fn fn, void *arg, const char *name);
113 	void (*group_impl_unregister_interrupt)(struct spdk_sock_group_impl *group);
114 	int (*group_impl_close)(struct spdk_sock_group_impl *group);
115 
116 	int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
117 	int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
118 
119 	STAILQ_ENTRY(spdk_net_impl) link;
120 };
121 
122 void spdk_net_impl_register(struct spdk_net_impl *impl);
123 
124 #define SPDK_NET_IMPL_REGISTER(name, impl) \
125 static void __attribute__((constructor)) net_impl_register_##name(void) \
126 { \
127 	spdk_net_impl_register(impl); \
128 }
129 
130 #define SPDK_NET_IMPL_REGISTER_DEFAULT(name, impl) \
131 static void __attribute__((constructor)) net_impl_register_default_##name(void) \
132 { \
133 	spdk_net_impl_register(impl); \
134 	spdk_sock_set_default_impl(SPDK_STRINGIFY(name)); \
135 }
136 
137 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);
138 
139 static inline void
140 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
141 {
142 	assert(req->internal.curr_list == NULL);
143 	if (spdk_trace_tpoint_enabled(TRACE_SOCK_REQ_QUEUE)) {
144 		uint64_t len = 0;
145 		int i;
146 
147 		for (i = 0; i < req->iovcnt; i++) {
148 			len += SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
149 		}
150 		spdk_trace_record(TRACE_SOCK_REQ_QUEUE, 0, len, (uintptr_t)req, (uintptr_t)req->cb_arg);
151 	}
152 	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
153 #ifdef DEBUG
154 	req->internal.curr_list = &sock->queued_reqs;
155 #endif
156 	sock->queued_iovcnt += req->iovcnt;
157 }
158 
159 static inline void
160 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
161 {
162 	assert(req->internal.curr_list == &sock->queued_reqs);
163 	spdk_trace_record(TRACE_SOCK_REQ_PEND, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
164 	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
165 	assert(sock->queued_iovcnt >= req->iovcnt);
166 	sock->queued_iovcnt -= req->iovcnt;
167 	TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
168 #ifdef DEBUG
169 	req->internal.curr_list = &sock->pending_reqs;
170 #endif
171 }
172 
173 static inline int
174 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
175 {
176 	bool closed;
177 	int rc = 0;
178 
179 	spdk_trace_record(TRACE_SOCK_REQ_COMPLETE, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
180 	req->internal.offset = 0;
181 	req->internal.is_zcopy = 0;
182 
183 	closed = sock->flags.closed;
184 	sock->cb_cnt++;
185 	req->cb_fn(req->cb_arg, err);
186 	assert(sock->cb_cnt > 0);
187 	sock->cb_cnt--;
188 
189 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
190 		/* The user closed the socket in response to a callback above. */
191 		rc = -1;
192 		spdk_sock_close(&sock);
193 	}
194 
195 	return rc;
196 }
197 
198 static inline int
199 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
200 {
201 	assert(req->internal.curr_list == &sock->pending_reqs);
202 	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
203 #ifdef DEBUG
204 	req->internal.curr_list = NULL;
205 #endif
206 	return spdk_sock_request_complete(sock, req, err);
207 }
208 
209 static inline int
210 spdk_sock_abort_requests(struct spdk_sock *sock)
211 {
212 	struct spdk_sock_request *req;
213 	bool closed;
214 	int rc = 0;
215 
216 	closed = sock->flags.closed;
217 	sock->cb_cnt++;
218 
219 	req = TAILQ_FIRST(&sock->pending_reqs);
220 	while (req) {
221 		assert(req->internal.curr_list == &sock->pending_reqs);
222 		TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
223 #ifdef DEBUG
224 		req->internal.curr_list = NULL;
225 #endif
226 
227 		req->cb_fn(req->cb_arg, -ECANCELED);
228 
229 		req = TAILQ_FIRST(&sock->pending_reqs);
230 	}
231 
232 	req = TAILQ_FIRST(&sock->queued_reqs);
233 	while (req) {
234 		assert(req->internal.curr_list == &sock->queued_reqs);
235 		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
236 #ifdef DEBUG
237 		req->internal.curr_list = NULL;
238 #endif
239 
240 		assert(sock->queued_iovcnt >= req->iovcnt);
241 		sock->queued_iovcnt -= req->iovcnt;
242 
243 		req->cb_fn(req->cb_arg, -ECANCELED);
244 
245 		req = TAILQ_FIRST(&sock->queued_reqs);
246 	}
247 
248 	req = sock->read_req;
249 	if (req != NULL) {
250 		sock->read_req = NULL;
251 		req->cb_fn(req->cb_arg, -ECANCELED);
252 	}
253 	assert(sock->cb_cnt > 0);
254 	sock->cb_cnt--;
255 
256 	assert(TAILQ_EMPTY(&sock->queued_reqs));
257 	assert(TAILQ_EMPTY(&sock->pending_reqs));
258 
259 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
260 		/* The user closed the socket in response to a callback above. */
261 		rc = -1;
262 		spdk_sock_close(&sock);
263 	}
264 
265 	return rc;
266 }
267 
268 static inline int
269 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
270 		   uint64_t *num_bytes)
271 {
272 	unsigned int offset;
273 	int iovcnt, i;
274 
275 	assert(index < IOV_BATCH_SIZE);
276 	offset = req->internal.offset;
277 	iovcnt = index;
278 
279 	for (i = 0; i < req->iovcnt; i++) {
280 		/* Consume any offset first */
281 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
282 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
283 			continue;
284 		}
285 
286 		iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
287 		iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
288 		if (num_bytes != NULL) {
289 			*num_bytes += iovs[iovcnt].iov_len;
290 		}
291 
292 		iovcnt++;
293 		offset = 0;
294 
295 		if (iovcnt >= IOV_BATCH_SIZE) {
296 			break;
297 		}
298 	}
299 
300 	return iovcnt;
301 }
302 
303 static inline int
304 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
305 		    struct spdk_sock_request **last_req, int *flags)
306 {
307 	int iovcnt;
308 	struct spdk_sock_request *req;
309 	uint64_t total = 0;
310 
311 	/* Gather an iov */
312 	iovcnt = index;
313 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
314 		goto end;
315 	}
316 
317 	if (last_req != NULL && *last_req != NULL) {
318 		req = TAILQ_NEXT(*last_req, internal.link);
319 	} else {
320 		req = TAILQ_FIRST(&_sock->queued_reqs);
321 	}
322 
323 	while (req) {
324 		iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
325 		if (iovcnt >= IOV_BATCH_SIZE) {
326 			break;
327 		}
328 
329 		if (last_req != NULL) {
330 			*last_req = req;
331 		}
332 		req = TAILQ_NEXT(req, internal.link);
333 	}
334 
335 end:
336 
337 #if defined(MSG_ZEROCOPY)
338 	/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
339 	if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
340 		*flags = *flags & (~MSG_ZEROCOPY);
341 	}
342 #endif
343 
344 	return iovcnt;
345 }
346 
347 static inline void
348 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
349 {
350 	*placement_id = -1;
351 
352 	switch (mode) {
353 	case PLACEMENT_NONE:
354 		break;
355 	case PLACEMENT_MARK:
356 	case PLACEMENT_NAPI: {
357 #if defined(SO_INCOMING_NAPI_ID)
358 		socklen_t len = sizeof(int);
359 
360 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
361 		if (rc == -1) {
362 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
363 			assert(false);
364 		}
365 #endif
366 		break;
367 	}
368 	case PLACEMENT_CPU: {
369 #if defined(SO_INCOMING_CPU)
370 		socklen_t len = sizeof(int);
371 
372 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
373 		if (rc == -1) {
374 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
375 			assert(false);
376 		}
377 #endif
378 		break;
379 	}
380 	default:
381 		break;
382 	}
383 }
384 
385 /**
386  * Insert a group into the placement map.
387  * If the group is already in the map, take a reference.
388  */
389 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
390 			 struct spdk_sock_group_impl *group_impl);
391 
392 /**
393  * Release a reference for the given placement_id. If the reference count goes to 0, the
394  * entry will no longer be associated with a group.
395  */
396 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
397 
398 /**
399  * Look up the group for the given placement_id.
400  */
401 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
402 			 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
403 
404 /**
405  * Find a placement id with no associated group
406  */
407 int spdk_sock_map_find_free(struct spdk_sock_map *map);
408 
409 /**
410  * Clean up all memory associated with the given map
411  */
412 void spdk_sock_map_cleanup(struct spdk_sock_map *map);
413 
414 #ifdef __cplusplus
415 }
416 #endif
417 
418 #endif /* SPDK_INTERNAL_SOCK_H */
419