xref: /spdk/lib/jsonrpc/jsonrpc_server_tcp.c (revision fecffda6ecf8853b82edccde429b68252f0a62c5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "jsonrpc_internal.h"
7 #include "spdk/string.h"
8 #include "spdk/util.h"
9 
10 struct spdk_jsonrpc_server *
11 spdk_jsonrpc_server_listen(int domain, int protocol,
12 			   struct sockaddr *listen_addr, socklen_t addrlen,
13 			   spdk_jsonrpc_handle_request_fn handle_request)
14 {
15 	struct spdk_jsonrpc_server *server;
16 	int rc, val, i;
17 
18 	server = calloc(1, sizeof(struct spdk_jsonrpc_server));
19 	if (server == NULL) {
20 		return NULL;
21 	}
22 
23 	TAILQ_INIT(&server->free_conns);
24 	TAILQ_INIT(&server->conns);
25 
26 	for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
27 		TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
28 	}
29 
30 	server->handle_request = handle_request;
31 
32 	server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
33 	if (server->sockfd < 0) {
34 		SPDK_ERRLOG("socket() failed\n");
35 		free(server);
36 		return NULL;
37 	}
38 
39 	val = 1;
40 	rc = setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
41 	if (rc != 0) {
42 		SPDK_ERRLOG("could not set SO_REUSEADDR sock option: %s\n", spdk_strerror(errno));
43 		close(server->sockfd);
44 		free(server);
45 		return NULL;
46 	}
47 
48 	rc = bind(server->sockfd, listen_addr, addrlen);
49 	if (rc != 0) {
50 		SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
51 		close(server->sockfd);
52 		free(server);
53 		return NULL;
54 	}
55 
56 	rc = listen(server->sockfd, 512);
57 	if (rc != 0) {
58 		SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
59 		close(server->sockfd);
60 		free(server);
61 		return NULL;
62 	}
63 
64 	return server;
65 }
66 
67 static struct spdk_jsonrpc_request *
68 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
69 {
70 	struct spdk_jsonrpc_request *request = NULL;
71 
72 	pthread_spin_lock(&conn->queue_lock);
73 	request = STAILQ_FIRST(&conn->send_queue);
74 	if (request) {
75 		STAILQ_REMOVE_HEAD(&conn->send_queue, link);
76 	}
77 	pthread_spin_unlock(&conn->queue_lock);
78 	return request;
79 }
80 
81 static void
82 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
83 {
84 	struct spdk_jsonrpc_request *request;
85 
86 	jsonrpc_free_request(conn->send_request);
87 	conn->send_request = NULL ;
88 	while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) {
89 		jsonrpc_free_request(request);
90 	}
91 }
92 
93 static void
94 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
95 {
96 	conn->closed = true;
97 
98 	if (conn->sockfd >= 0) {
99 		jsonrpc_server_free_conn_request(conn);
100 		close(conn->sockfd);
101 		conn->sockfd = -1;
102 
103 		if (conn->close_cb) {
104 			conn->close_cb(conn, conn->close_cb_ctx);
105 		}
106 	}
107 }
108 
109 void
110 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
111 {
112 	struct spdk_jsonrpc_server_conn *conn;
113 
114 	close(server->sockfd);
115 
116 	TAILQ_FOREACH(conn, &server->conns, link) {
117 		jsonrpc_server_conn_close(conn);
118 	}
119 
120 	free(server);
121 }
122 
123 static void
124 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
125 {
126 	struct spdk_jsonrpc_server *server = conn->server;
127 
128 	jsonrpc_server_conn_close(conn);
129 
130 	pthread_spin_destroy(&conn->queue_lock);
131 	assert(STAILQ_EMPTY(&conn->send_queue));
132 
133 	TAILQ_REMOVE(&server->conns, conn, link);
134 	TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
135 }
136 
137 int
138 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
139 			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
140 {
141 	int rc = 0;
142 
143 	pthread_spin_lock(&conn->queue_lock);
144 	if (conn->close_cb == NULL) {
145 		conn->close_cb = cb;
146 		conn->close_cb_ctx = ctx;
147 	} else {
148 		rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
149 	}
150 	pthread_spin_unlock(&conn->queue_lock);
151 
152 	return rc;
153 }
154 
155 int
156 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
157 			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
158 {
159 	int rc = 0;
160 
161 	pthread_spin_lock(&conn->queue_lock);
162 	if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
163 		rc = -ENOENT;
164 	} else {
165 		conn->close_cb = NULL;
166 	}
167 	pthread_spin_unlock(&conn->queue_lock);
168 
169 	return rc;
170 }
171 
172 static int
173 jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
174 {
175 	struct spdk_jsonrpc_server_conn *conn;
176 	int rc, flag;
177 
178 	rc = accept(server->sockfd, NULL, NULL);
179 	if (rc >= 0) {
180 		conn = TAILQ_FIRST(&server->free_conns);
181 		assert(conn != NULL);
182 
183 		conn->server = server;
184 		conn->sockfd = rc;
185 		conn->closed = false;
186 		conn->recv_len = 0;
187 		conn->outstanding_requests = 0;
188 		STAILQ_INIT(&conn->send_queue);
189 		conn->send_request = NULL;
190 
191 		if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) {
192 			SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd);
193 			close(conn->sockfd);
194 			return -1;
195 		}
196 
197 		flag = fcntl(conn->sockfd, F_GETFL);
198 		if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
199 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
200 				    conn->sockfd, spdk_strerror(errno));
201 			close(conn->sockfd);
202 			pthread_spin_destroy(&conn->queue_lock);
203 			return -1;
204 		}
205 
206 		TAILQ_REMOVE(&server->free_conns, conn, link);
207 		TAILQ_INSERT_TAIL(&server->conns, conn, link);
208 		return 0;
209 	}
210 
211 	if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
212 		return 0;
213 	}
214 
215 	return -1;
216 }
217 
218 void
219 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
220 			      const struct spdk_json_val *method, const struct spdk_json_val *params)
221 {
222 	request->conn->server->handle_request(request, method, params);
223 }
224 
225 void
226 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
227 {
228 	const char *msg;
229 
230 	switch (error) {
231 	case SPDK_JSONRPC_ERROR_PARSE_ERROR:
232 		msg = "Parse error";
233 		break;
234 
235 	case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
236 		msg = "Invalid request";
237 		break;
238 
239 	case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
240 		msg = "Method not found";
241 		break;
242 
243 	case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
244 		msg = "Invalid parameters";
245 		break;
246 
247 	case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
248 		msg = "Internal error";
249 		break;
250 
251 	default:
252 		msg = "Error";
253 		break;
254 	}
255 
256 	spdk_jsonrpc_send_error_response(request, error, msg);
257 }
258 
259 static int
260 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
261 {
262 	ssize_t rc, offset;
263 	size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
264 
265 	rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
266 	if (rc == -1) {
267 		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
268 			return 0;
269 		}
270 		SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno));
271 		return -1;
272 	}
273 
274 	if (rc == 0) {
275 		SPDK_DEBUGLOG(rpc, "remote closed connection\n");
276 		conn->closed = true;
277 		return 0;
278 	}
279 
280 	conn->recv_len += rc;
281 
282 	offset = 0;
283 	do {
284 		rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
285 		if (rc < 0) {
286 			SPDK_ERRLOG("jsonrpc parse request failed\n");
287 			return -1;
288 		}
289 
290 		offset += rc;
291 	} while (rc > 0);
292 
293 	if (offset > 0) {
294 		/*
295 		 * Successfully parsed a requests - move any data past the end of the
296 		 * parsed requests down to the beginning.
297 		 */
298 		assert((size_t)offset <= conn->recv_len);
299 		memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
300 		conn->recv_len -= offset;
301 	}
302 
303 	return 0;
304 }
305 
306 void
307 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
308 {
309 	struct spdk_jsonrpc_server_conn *conn = request->conn;
310 
311 	/* Queue the response to be sent */
312 	pthread_spin_lock(&conn->queue_lock);
313 	STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
314 	pthread_spin_unlock(&conn->queue_lock);
315 }
316 
317 
318 static int
319 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
320 {
321 	struct spdk_jsonrpc_request *request;
322 	ssize_t rc;
323 
324 more:
325 	if (conn->outstanding_requests == 0) {
326 		return 0;
327 	}
328 
329 	if (conn->send_request == NULL) {
330 		conn->send_request = jsonrpc_server_dequeue_request(conn);
331 	}
332 
333 	request = conn->send_request;
334 	if (request == NULL) {
335 		/* Nothing to send right now */
336 		return 0;
337 	}
338 
339 	if (request->send_len > 0) {
340 		rc = send(conn->sockfd, request->send_buf + request->send_offset,
341 			  request->send_len, 0);
342 		if (rc < 0) {
343 			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
344 				return 0;
345 			}
346 
347 			SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno));
348 			return -1;
349 		}
350 
351 		request->send_offset += rc;
352 		request->send_len -= rc;
353 	}
354 
355 	if (request->send_len == 0) {
356 		/*
357 		 * Full response has been sent.
358 		 * Free it and set send_request to NULL to move on to the next queued response.
359 		 */
360 		conn->send_request = NULL;
361 		jsonrpc_free_request(request);
362 		goto more;
363 	}
364 
365 	return 0;
366 }
367 
368 int
369 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
370 {
371 	int rc;
372 	struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
373 
374 	TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
375 		/* If we can't receive and there are no outstanding requests close the connection. */
376 		if (conn->closed == true && conn->outstanding_requests == 0) {
377 			jsonrpc_server_conn_close(conn);
378 		}
379 
380 		if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
381 			jsonrpc_server_conn_remove(conn);
382 		}
383 	}
384 
385 	/* Check listen socket */
386 	if (!TAILQ_EMPTY(&server->free_conns)) {
387 		jsonrpc_server_accept(server);
388 	}
389 
390 	TAILQ_FOREACH(conn, &server->conns, link) {
391 		if (conn->sockfd == -1) {
392 			continue;
393 		}
394 
395 		rc = jsonrpc_server_conn_send(conn);
396 		if (rc != 0) {
397 			jsonrpc_server_conn_close(conn);
398 			continue;
399 		}
400 
401 		if (!conn->closed) {
402 			rc = jsonrpc_server_conn_recv(conn);
403 			if (rc != 0) {
404 				jsonrpc_server_conn_close(conn);
405 			}
406 		}
407 	}
408 
409 	return 0;
410 }
411