xref: /spdk/include/spdk_internal/sock.h (revision c6c1234de9e0015e670dd0b51bf6ce39ee0e07bd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4  */
5 
6 /** \file
7  * TCP network implementation abstraction layer
8  */
9 
10 #ifndef SPDK_INTERNAL_SOCK_H
11 #define SPDK_INTERNAL_SOCK_H
12 
13 #include "spdk/stdinc.h"
14 #include "spdk/sock.h"
15 #include "spdk/queue.h"
16 #include "spdk/likely.h"
17 #include "spdk/log.h"
18 #include "spdk/trace.h"
19 #include "spdk_internal/trace_defs.h"
20 
21 #ifdef __cplusplus
22 extern "C" {
23 #endif
24 
25 #define MAX_EVENTS_PER_POLL 32
26 #define DEFAULT_SOCK_PRIORITY 0
27 #define MIN_SOCK_PIPE_SIZE 1024
28 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024)
29 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024)
30 #define MIN_SO_RCVBUF_SIZE (4 * 1024)
31 #define MIN_SO_SNDBUF_SIZE (4 * 1024)
32 #define IOV_BATCH_SIZE 64
33 
34 struct spdk_sock {
35 	struct spdk_net_impl		*net_impl;
36 	struct spdk_sock_opts		opts;
37 	struct spdk_sock_group_impl	*group_impl;
38 	TAILQ_ENTRY(spdk_sock)		link;
39 
40 	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
41 	TAILQ_HEAD(, spdk_sock_request)	pending_reqs;
42 	struct spdk_sock_request	*read_req;
43 	int				queued_iovcnt;
44 	int				cb_cnt;
45 	spdk_sock_cb			cb_fn;
46 	void				*cb_arg;
47 	struct {
48 		uint8_t		closed		: 1;
49 		uint8_t		reserved	: 7;
50 	} flags;
51 	struct spdk_sock_impl_opts	impl_opts;
52 };
53 
54 struct spdk_sock_group_provided_buf {
55 	size_t						len;
56 	void						*ctx;
57 	STAILQ_ENTRY(spdk_sock_group_provided_buf)	link;
58 };
59 
60 struct spdk_sock_group {
61 	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
62 	STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
63 	void					*ctx;
64 };
65 
66 struct spdk_sock_group_impl {
67 	struct spdk_net_impl			*net_impl;
68 	struct spdk_sock_group			*group;
69 	TAILQ_HEAD(, spdk_sock)			socks;
70 	STAILQ_ENTRY(spdk_sock_group_impl)	link;
71 };
72 
73 struct spdk_sock_map {
74 	STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
75 	pthread_mutex_t mtx;
76 };
77 
78 struct spdk_net_impl {
79 	const char *name;
80 
81 	int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
82 		       int clen, uint16_t *cport);
83 	const char *(*get_interface_name)(struct spdk_sock *sock);
84 	int32_t (*get_numa_socket_id)(struct spdk_sock *sock);
85 	struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
86 	struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
87 	struct spdk_sock *(*accept)(struct spdk_sock *sock);
88 	int (*close)(struct spdk_sock *sock);
89 	ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
90 	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
91 	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
92 
93 	int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
94 	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
95 	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
96 	int (*flush)(struct spdk_sock *sock);
97 
98 	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
99 	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
100 	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
101 
102 	bool (*is_ipv6)(struct spdk_sock *sock);
103 	bool (*is_ipv4)(struct spdk_sock *sock);
104 	bool (*is_connected)(struct spdk_sock *sock);
105 
106 	struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
107 			struct spdk_sock_group_impl *hint);
108 	struct spdk_sock_group_impl *(*group_impl_create)(void);
109 	int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
110 	int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
111 	int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
112 			       struct spdk_sock **socks);
113 	int (*group_impl_register_interrupt)(struct spdk_sock_group_impl *group, uint32_t events,
114 					     spdk_interrupt_fn fn, void *arg, const char *name);
115 	void (*group_impl_unregister_interrupt)(struct spdk_sock_group_impl *group);
116 	int (*group_impl_close)(struct spdk_sock_group_impl *group);
117 
118 	int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
119 	int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
120 
121 	STAILQ_ENTRY(spdk_net_impl) link;
122 };
123 
124 void spdk_net_impl_register(struct spdk_net_impl *impl);
125 
126 #define SPDK_NET_IMPL_REGISTER(name, impl) \
127 static void __attribute__((constructor)) net_impl_register_##name(void) \
128 { \
129 	spdk_net_impl_register(impl); \
130 }
131 
132 #define SPDK_NET_IMPL_REGISTER_DEFAULT(name, impl) \
133 static void __attribute__((constructor)) net_impl_register_default_##name(void) \
134 { \
135 	spdk_net_impl_register(impl); \
136 	spdk_sock_set_default_impl(SPDK_STRINGIFY(name)); \
137 }
138 
139 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);
140 
141 static inline void
142 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
143 {
144 	assert(req->internal.curr_list == NULL);
145 	if (spdk_trace_tpoint_enabled(TRACE_SOCK_REQ_QUEUE)) {
146 		uint64_t len = 0;
147 		int i;
148 
149 		for (i = 0; i < req->iovcnt; i++) {
150 			len += SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
151 		}
152 		spdk_trace_record(TRACE_SOCK_REQ_QUEUE, 0, len, (uintptr_t)req, (uintptr_t)req->cb_arg);
153 	}
154 	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
155 #ifdef DEBUG
156 	req->internal.curr_list = &sock->queued_reqs;
157 #endif
158 	sock->queued_iovcnt += req->iovcnt;
159 }
160 
161 static inline void
162 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
163 {
164 	assert(req->internal.curr_list == &sock->queued_reqs);
165 	spdk_trace_record(TRACE_SOCK_REQ_PEND, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
166 	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
167 	assert(sock->queued_iovcnt >= req->iovcnt);
168 	sock->queued_iovcnt -= req->iovcnt;
169 	TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
170 #ifdef DEBUG
171 	req->internal.curr_list = &sock->pending_reqs;
172 #endif
173 }
174 
175 static inline int
176 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
177 {
178 	bool closed;
179 	int rc = 0;
180 
181 	spdk_trace_record(TRACE_SOCK_REQ_COMPLETE, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
182 	req->internal.offset = 0;
183 	req->internal.is_zcopy = 0;
184 
185 	closed = sock->flags.closed;
186 	sock->cb_cnt++;
187 	req->cb_fn(req->cb_arg, err);
188 	assert(sock->cb_cnt > 0);
189 	sock->cb_cnt--;
190 
191 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
192 		/* The user closed the socket in response to a callback above. */
193 		rc = -1;
194 		spdk_sock_close(&sock);
195 	}
196 
197 	return rc;
198 }
199 
200 static inline int
201 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
202 {
203 	assert(req->internal.curr_list == &sock->pending_reqs);
204 	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
205 #ifdef DEBUG
206 	req->internal.curr_list = NULL;
207 #endif
208 	return spdk_sock_request_complete(sock, req, err);
209 }
210 
211 static inline int
212 spdk_sock_abort_requests(struct spdk_sock *sock)
213 {
214 	struct spdk_sock_request *req;
215 	bool closed;
216 	int rc = 0;
217 
218 	closed = sock->flags.closed;
219 	sock->cb_cnt++;
220 
221 	req = TAILQ_FIRST(&sock->pending_reqs);
222 	while (req) {
223 		assert(req->internal.curr_list == &sock->pending_reqs);
224 		TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
225 #ifdef DEBUG
226 		req->internal.curr_list = NULL;
227 #endif
228 
229 		req->cb_fn(req->cb_arg, -ECANCELED);
230 
231 		req = TAILQ_FIRST(&sock->pending_reqs);
232 	}
233 
234 	req = TAILQ_FIRST(&sock->queued_reqs);
235 	while (req) {
236 		assert(req->internal.curr_list == &sock->queued_reqs);
237 		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
238 #ifdef DEBUG
239 		req->internal.curr_list = NULL;
240 #endif
241 
242 		assert(sock->queued_iovcnt >= req->iovcnt);
243 		sock->queued_iovcnt -= req->iovcnt;
244 
245 		req->cb_fn(req->cb_arg, -ECANCELED);
246 
247 		req = TAILQ_FIRST(&sock->queued_reqs);
248 	}
249 
250 	req = sock->read_req;
251 	if (req != NULL) {
252 		sock->read_req = NULL;
253 		req->cb_fn(req->cb_arg, -ECANCELED);
254 	}
255 	assert(sock->cb_cnt > 0);
256 	sock->cb_cnt--;
257 
258 	assert(TAILQ_EMPTY(&sock->queued_reqs));
259 	assert(TAILQ_EMPTY(&sock->pending_reqs));
260 
261 	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
262 		/* The user closed the socket in response to a callback above. */
263 		rc = -1;
264 		spdk_sock_close(&sock);
265 	}
266 
267 	return rc;
268 }
269 
270 static inline int
271 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
272 		   uint64_t *num_bytes)
273 {
274 	unsigned int offset;
275 	int iovcnt, i;
276 
277 	assert(index < IOV_BATCH_SIZE);
278 	offset = req->internal.offset;
279 	iovcnt = index;
280 
281 	for (i = 0; i < req->iovcnt; i++) {
282 		/* Consume any offset first */
283 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
284 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
285 			continue;
286 		}
287 
288 		iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
289 		iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
290 		if (num_bytes != NULL) {
291 			*num_bytes += iovs[iovcnt].iov_len;
292 		}
293 
294 		iovcnt++;
295 		offset = 0;
296 
297 		if (iovcnt >= IOV_BATCH_SIZE) {
298 			break;
299 		}
300 	}
301 
302 	return iovcnt;
303 }
304 
305 static inline int
306 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
307 		    struct spdk_sock_request **last_req, int *flags)
308 {
309 	int iovcnt;
310 	struct spdk_sock_request *req;
311 	uint64_t total = 0;
312 
313 	/* Gather an iov */
314 	iovcnt = index;
315 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
316 		goto end;
317 	}
318 
319 	if (last_req != NULL && *last_req != NULL) {
320 		req = TAILQ_NEXT(*last_req, internal.link);
321 	} else {
322 		req = TAILQ_FIRST(&_sock->queued_reqs);
323 	}
324 
325 	while (req) {
326 		iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
327 		if (iovcnt >= IOV_BATCH_SIZE) {
328 			break;
329 		}
330 
331 		if (last_req != NULL) {
332 			*last_req = req;
333 		}
334 		req = TAILQ_NEXT(req, internal.link);
335 	}
336 
337 end:
338 
339 #if defined(MSG_ZEROCOPY)
340 	/* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
341 	if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
342 		*flags = *flags & (~MSG_ZEROCOPY);
343 	}
344 #endif
345 
346 	return iovcnt;
347 }
348 
349 static inline void
350 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
351 {
352 	*placement_id = -1;
353 
354 	switch (mode) {
355 	case PLACEMENT_NONE:
356 		break;
357 	case PLACEMENT_MARK:
358 	case PLACEMENT_NAPI: {
359 #if defined(SO_INCOMING_NAPI_ID)
360 		socklen_t len = sizeof(int);
361 
362 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
363 		if (rc == -1) {
364 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
365 			assert(false);
366 		}
367 #endif
368 		break;
369 	}
370 	case PLACEMENT_CPU: {
371 #if defined(SO_INCOMING_CPU)
372 		socklen_t len = sizeof(int);
373 
374 		int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
375 		if (rc == -1) {
376 			SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
377 			assert(false);
378 		}
379 #endif
380 		break;
381 	}
382 	default:
383 		break;
384 	}
385 }
386 
387 /**
388  * Insert a group into the placement map.
389  * If the group is already in the map, take a reference.
390  */
391 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
392 			 struct spdk_sock_group_impl *group_impl);
393 
394 /**
395  * Release a reference for the given placement_id. If the reference count goes to 0, the
396  * entry will no longer be associated with a group.
397  */
398 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
399 
400 /**
401  * Look up the group for the given placement_id.
402  */
403 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
404 			 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
405 
406 /**
407  * Find a placement id with no associated group
408  */
409 int spdk_sock_map_find_free(struct spdk_sock_map *map);
410 
411 /**
412  * Clean up all memory associated with the given map
413  */
414 void spdk_sock_map_cleanup(struct spdk_sock_map *map);
415 
416 #ifdef __cplusplus
417 }
418 #endif
419 
420 #endif /* SPDK_INTERNAL_SOCK_H */
421