xref: /spdk/lib/jsonrpc/jsonrpc_server_tcp.c (revision 2fb672af968d9495a388262bd76f63c74b347af1)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "jsonrpc_internal.h"
7 #include "spdk/string.h"
8 #include "spdk/util.h"
9 
10 struct spdk_jsonrpc_server *
spdk_jsonrpc_server_listen(int domain,int protocol,struct sockaddr * listen_addr,socklen_t addrlen,spdk_jsonrpc_handle_request_fn handle_request)11 spdk_jsonrpc_server_listen(int domain, int protocol,
12 			   struct sockaddr *listen_addr, socklen_t addrlen,
13 			   spdk_jsonrpc_handle_request_fn handle_request)
14 {
15 	struct spdk_jsonrpc_server *server;
16 	int rc, val, i;
17 
18 	server = calloc(1, sizeof(struct spdk_jsonrpc_server));
19 	if (server == NULL) {
20 		return NULL;
21 	}
22 
23 	TAILQ_INIT(&server->free_conns);
24 	TAILQ_INIT(&server->conns);
25 
26 	for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
27 		TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
28 	}
29 
30 	server->handle_request = handle_request;
31 
32 	server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
33 	if (server->sockfd < 0) {
34 		SPDK_ERRLOG("socket() failed\n");
35 		free(server);
36 		return NULL;
37 	}
38 
39 	val = 1;
40 	rc = setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
41 	if (rc != 0) {
42 		SPDK_ERRLOG("could not set SO_REUSEADDR sock option: %s\n", spdk_strerror(errno));
43 		close(server->sockfd);
44 		free(server);
45 		return NULL;
46 	}
47 
48 	rc = bind(server->sockfd, listen_addr, addrlen);
49 	if (rc != 0) {
50 		SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
51 		close(server->sockfd);
52 		free(server);
53 		return NULL;
54 	}
55 
56 	rc = listen(server->sockfd, 512);
57 	if (rc != 0) {
58 		SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
59 		close(server->sockfd);
60 		free(server);
61 		return NULL;
62 	}
63 
64 	return server;
65 }
66 
67 static struct spdk_jsonrpc_request *
jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn * conn)68 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
69 {
70 	struct spdk_jsonrpc_request *request = NULL;
71 
72 	pthread_spin_lock(&conn->queue_lock);
73 	request = STAILQ_FIRST(&conn->send_queue);
74 	if (request) {
75 		STAILQ_REMOVE_HEAD(&conn->send_queue, link);
76 	}
77 	pthread_spin_unlock(&conn->queue_lock);
78 	return request;
79 }
80 
81 static void
jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn * conn)82 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
83 {
84 	struct spdk_jsonrpc_request *request;
85 
86 	jsonrpc_free_request(conn->send_request);
87 	conn->send_request = NULL ;
88 
89 	pthread_spin_lock(&conn->queue_lock);
90 	/* There might still be some requests being processed.
91 	 * We need to tell them that this connection is closed. */
92 	STAILQ_FOREACH(request, &conn->outstanding_queue, link) {
93 		request->conn = NULL;
94 	}
95 	pthread_spin_unlock(&conn->queue_lock);
96 
97 	while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) {
98 		jsonrpc_free_request(request);
99 	}
100 }
101 
102 static void
jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn * conn)103 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
104 {
105 	conn->closed = true;
106 
107 	if (conn->sockfd >= 0) {
108 		jsonrpc_server_free_conn_request(conn);
109 		close(conn->sockfd);
110 		conn->sockfd = -1;
111 
112 		if (conn->close_cb) {
113 			conn->close_cb(conn, conn->close_cb_ctx);
114 		}
115 	}
116 }
117 
118 void
spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server * server)119 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
120 {
121 	struct spdk_jsonrpc_server_conn *conn;
122 
123 	close(server->sockfd);
124 
125 	TAILQ_FOREACH(conn, &server->conns, link) {
126 		jsonrpc_server_conn_close(conn);
127 	}
128 
129 	free(server);
130 }
131 
132 static void
jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn * conn)133 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
134 {
135 	struct spdk_jsonrpc_server *server = conn->server;
136 
137 	jsonrpc_server_conn_close(conn);
138 
139 	pthread_spin_destroy(&conn->queue_lock);
140 	assert(STAILQ_EMPTY(&conn->send_queue));
141 
142 	TAILQ_REMOVE(&server->conns, conn, link);
143 	TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
144 }
145 
146 int
spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn * conn,spdk_jsonrpc_conn_closed_fn cb,void * ctx)147 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
148 			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
149 {
150 	int rc = 0;
151 
152 	pthread_spin_lock(&conn->queue_lock);
153 	if (conn->close_cb == NULL) {
154 		conn->close_cb = cb;
155 		conn->close_cb_ctx = ctx;
156 	} else {
157 		rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
158 	}
159 	pthread_spin_unlock(&conn->queue_lock);
160 
161 	return rc;
162 }
163 
164 int
spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn * conn,spdk_jsonrpc_conn_closed_fn cb,void * ctx)165 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
166 			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
167 {
168 	int rc = 0;
169 
170 	pthread_spin_lock(&conn->queue_lock);
171 	if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
172 		rc = -ENOENT;
173 	} else {
174 		conn->close_cb = NULL;
175 	}
176 	pthread_spin_unlock(&conn->queue_lock);
177 
178 	return rc;
179 }
180 
181 static int
jsonrpc_server_accept(struct spdk_jsonrpc_server * server)182 jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
183 {
184 	struct spdk_jsonrpc_server_conn *conn;
185 	int rc, flag;
186 
187 	rc = accept(server->sockfd, NULL, NULL);
188 	if (rc >= 0) {
189 		conn = TAILQ_FIRST(&server->free_conns);
190 		assert(conn != NULL);
191 
192 		conn->server = server;
193 		conn->sockfd = rc;
194 		conn->closed = false;
195 		conn->recv_len = 0;
196 		conn->outstanding_requests = 0;
197 		STAILQ_INIT(&conn->send_queue);
198 		STAILQ_INIT(&conn->outstanding_queue);
199 		conn->send_request = NULL;
200 
201 		if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) {
202 			SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd);
203 			close(conn->sockfd);
204 			return -1;
205 		}
206 
207 		flag = fcntl(conn->sockfd, F_GETFL);
208 		if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
209 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
210 				    conn->sockfd, spdk_strerror(errno));
211 			close(conn->sockfd);
212 			pthread_spin_destroy(&conn->queue_lock);
213 			return -1;
214 		}
215 
216 		TAILQ_REMOVE(&server->free_conns, conn, link);
217 		TAILQ_INSERT_TAIL(&server->conns, conn, link);
218 		return 0;
219 	}
220 
221 	if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
222 		return 0;
223 	}
224 
225 	return -1;
226 }
227 
228 void
jsonrpc_server_handle_request(struct spdk_jsonrpc_request * request,const struct spdk_json_val * method,const struct spdk_json_val * params)229 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
230 			      const struct spdk_json_val *method, const struct spdk_json_val *params)
231 {
232 	request->conn->server->handle_request(request, method, params);
233 }
234 
235 void
jsonrpc_server_handle_error(struct spdk_jsonrpc_request * request,int error)236 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
237 {
238 	const char *msg;
239 
240 	switch (error) {
241 	case SPDK_JSONRPC_ERROR_PARSE_ERROR:
242 		msg = "Parse error";
243 		break;
244 
245 	case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
246 		msg = "Invalid request";
247 		break;
248 
249 	case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
250 		msg = "Method not found";
251 		break;
252 
253 	case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
254 		msg = "Invalid parameters";
255 		break;
256 
257 	case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
258 		msg = "Internal error";
259 		break;
260 
261 	default:
262 		msg = "Error";
263 		break;
264 	}
265 
266 	spdk_jsonrpc_send_error_response(request, error, msg);
267 }
268 
269 static int
jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn * conn)270 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
271 {
272 	ssize_t rc, offset;
273 	size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
274 
275 	rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
276 	if (rc == -1) {
277 		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
278 			return 0;
279 		}
280 		SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno));
281 		return -1;
282 	}
283 
284 	if (rc == 0) {
285 		SPDK_DEBUGLOG(rpc, "remote closed connection\n");
286 		conn->closed = true;
287 		return 0;
288 	}
289 
290 	conn->recv_len += rc;
291 
292 	offset = 0;
293 	do {
294 		rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
295 		if (rc < 0) {
296 			SPDK_ERRLOG("jsonrpc parse request failed\n");
297 			return -1;
298 		}
299 
300 		offset += rc;
301 	} while (rc > 0);
302 
303 	if (offset > 0) {
304 		/*
305 		 * Successfully parsed a requests - move any data past the end of the
306 		 * parsed requests down to the beginning.
307 		 */
308 		assert((size_t)offset <= conn->recv_len);
309 		memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
310 		conn->recv_len -= offset;
311 	}
312 
313 	return 0;
314 }
315 
316 void
jsonrpc_server_send_response(struct spdk_jsonrpc_request * request)317 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
318 {
319 	struct spdk_jsonrpc_server_conn *conn = request->conn;
320 
321 	if (conn == NULL) {
322 		/* We cannot respond to the request, because the connection is closed. */
323 		SPDK_WARNLOG("Unable to send response: connection closed.\n");
324 		jsonrpc_free_request(request);
325 		return;
326 	}
327 
328 	/* Queue the response to be sent */
329 	pthread_spin_lock(&conn->queue_lock);
330 	STAILQ_REMOVE(&conn->outstanding_queue, request, spdk_jsonrpc_request, link);
331 	STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
332 	pthread_spin_unlock(&conn->queue_lock);
333 }
334 
335 
336 static int
jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn * conn)337 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
338 {
339 	struct spdk_jsonrpc_request *request;
340 	ssize_t rc;
341 
342 more:
343 	if (conn->outstanding_requests == 0) {
344 		return 0;
345 	}
346 
347 	if (conn->send_request == NULL) {
348 		conn->send_request = jsonrpc_server_dequeue_request(conn);
349 	}
350 
351 	request = conn->send_request;
352 	if (request == NULL) {
353 		/* Nothing to send right now */
354 		return 0;
355 	}
356 
357 	if (request->send_offset == 0) {
358 		/* A byte for the null terminator is included in the send buffer. */
359 		request->send_buf[request->send_len] = '\0';
360 	}
361 
362 	if (request->send_len > 0) {
363 		rc = send(conn->sockfd, request->send_buf + request->send_offset,
364 			  request->send_len, 0);
365 		if (rc < 0) {
366 			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
367 				return 0;
368 			}
369 
370 			SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno));
371 			return -1;
372 		}
373 
374 		request->send_offset += rc;
375 		request->send_len -= rc;
376 	}
377 
378 	if (request->send_len == 0) {
379 		/*
380 		 * Full response has been sent.
381 		 * Free it and set send_request to NULL to move on to the next queued response.
382 		 */
383 		conn->send_request = NULL;
384 		jsonrpc_complete_request(request);
385 		goto more;
386 	}
387 
388 	return 0;
389 }
390 
391 int
spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server * server)392 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
393 {
394 	int rc;
395 	struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
396 
397 	TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
398 		/* If we can't receive and there are no outstanding requests close the connection. */
399 		if (conn->closed == true && conn->outstanding_requests == 0) {
400 			jsonrpc_server_conn_close(conn);
401 		}
402 
403 		if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
404 			jsonrpc_server_conn_remove(conn);
405 		}
406 	}
407 
408 	/* Check listen socket */
409 	if (!TAILQ_EMPTY(&server->free_conns)) {
410 		jsonrpc_server_accept(server);
411 	}
412 
413 	TAILQ_FOREACH(conn, &server->conns, link) {
414 		if (conn->sockfd == -1) {
415 			continue;
416 		}
417 
418 		rc = jsonrpc_server_conn_send(conn);
419 		if (rc != 0) {
420 			jsonrpc_server_conn_close(conn);
421 			continue;
422 		}
423 
424 		if (!conn->closed) {
425 			rc = jsonrpc_server_conn_recv(conn);
426 			if (rc != 0) {
427 				jsonrpc_server_conn_close(conn);
428 			}
429 		}
430 	}
431 
432 	return 0;
433 }
434