xref: /spdk/lib/jsonrpc/jsonrpc_server_tcp.c (revision 712a3f69d32632bf6c862f00200f7f437d3f7529)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "jsonrpc_internal.h"
35 #include "spdk/string.h"
36 #include "spdk/util.h"
37 
38 struct spdk_jsonrpc_server *
39 spdk_jsonrpc_server_listen(int domain, int protocol,
40 			   struct sockaddr *listen_addr, socklen_t addrlen,
41 			   spdk_jsonrpc_handle_request_fn handle_request)
42 {
43 	struct spdk_jsonrpc_server *server;
44 	int rc, val, flag, i;
45 
46 	server = calloc(1, sizeof(struct spdk_jsonrpc_server));
47 	if (server == NULL) {
48 		return NULL;
49 	}
50 
51 	TAILQ_INIT(&server->free_conns);
52 	TAILQ_INIT(&server->conns);
53 
54 	for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
55 		TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
56 	}
57 
58 	server->handle_request = handle_request;
59 
60 	server->sockfd = socket(domain, SOCK_STREAM, protocol);
61 	if (server->sockfd < 0) {
62 		SPDK_ERRLOG("socket() failed\n");
63 		free(server);
64 		return NULL;
65 	}
66 
67 	val = 1;
68 	setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
69 	if (protocol == IPPROTO_TCP) {
70 		setsockopt(server->sockfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
71 	}
72 
73 	flag = fcntl(server->sockfd, F_GETFL);
74 	if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
75 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
76 			    server->sockfd, spdk_strerror(errno));
77 		close(server->sockfd);
78 		free(server);
79 		return NULL;
80 	}
81 
82 	rc = bind(server->sockfd, listen_addr, addrlen);
83 	if (rc != 0) {
84 		SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
85 		close(server->sockfd);
86 		free(server);
87 		return NULL;
88 	}
89 
90 	rc = listen(server->sockfd, 512);
91 	if (rc != 0) {
92 		SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
93 		close(server->sockfd);
94 		free(server);
95 		return NULL;
96 	}
97 
98 	return server;
99 }
100 
101 static struct spdk_jsonrpc_request *
102 spdk_jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
103 {
104 	struct spdk_jsonrpc_request *request = NULL;
105 
106 	pthread_spin_lock(&conn->queue_lock);
107 	request = STAILQ_FIRST(&conn->send_queue);
108 	if (request) {
109 		STAILQ_REMOVE_HEAD(&conn->send_queue, link);
110 	}
111 	pthread_spin_unlock(&conn->queue_lock);
112 	return request;
113 }
114 
115 static void
116 spdk_jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
117 {
118 	struct spdk_jsonrpc_request *request;
119 
120 	spdk_jsonrpc_free_request(conn->send_request);
121 	conn->send_request = NULL ;
122 	while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) {
123 		spdk_jsonrpc_free_request(request);
124 	}
125 }
126 
127 static void
128 spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
129 {
130 	conn->closed = true;
131 
132 	if (conn->sockfd >= 0) {
133 		spdk_jsonrpc_server_free_conn_request(conn);
134 		close(conn->sockfd);
135 		conn->sockfd = -1;
136 
137 		if (conn->close_cb) {
138 			conn->close_cb(conn, conn->close_cb_ctx);
139 		}
140 	}
141 }
142 
143 void
144 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
145 {
146 	struct spdk_jsonrpc_server_conn *conn;
147 
148 	close(server->sockfd);
149 
150 	TAILQ_FOREACH(conn, &server->conns, link) {
151 		spdk_jsonrpc_server_conn_close(conn);
152 	}
153 
154 	free(server);
155 }
156 
157 static void
158 spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
159 {
160 	struct spdk_jsonrpc_server *server = conn->server;
161 
162 	spdk_jsonrpc_server_conn_close(conn);
163 
164 	pthread_spin_destroy(&conn->queue_lock);
165 	assert(STAILQ_EMPTY(&conn->send_queue));
166 
167 	TAILQ_REMOVE(&server->conns, conn, link);
168 	TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
169 }
170 
171 int
172 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
173 			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
174 {
175 	int rc = 0;
176 
177 	pthread_spin_lock(&conn->queue_lock);
178 	if (conn->close_cb == NULL) {
179 		conn->close_cb = cb;
180 		conn->close_cb_ctx = ctx;
181 	} else {
182 		rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
183 	}
184 	pthread_spin_unlock(&conn->queue_lock);
185 
186 	return rc;
187 }
188 
189 int
190 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
191 			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
192 {
193 	int rc = 0;
194 
195 	pthread_spin_lock(&conn->queue_lock);
196 	if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
197 		rc = -ENOENT;
198 	} else {
199 		conn->close_cb = NULL;
200 	}
201 	pthread_spin_unlock(&conn->queue_lock);
202 
203 	return rc;
204 }
205 
206 static int
207 spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
208 {
209 	struct spdk_jsonrpc_server_conn *conn;
210 	int rc, flag;
211 
212 	rc = accept(server->sockfd, NULL, NULL);
213 	if (rc >= 0) {
214 		conn = TAILQ_FIRST(&server->free_conns);
215 		assert(conn != NULL);
216 
217 		conn->server = server;
218 		conn->sockfd = rc;
219 		conn->closed = false;
220 		conn->recv_len = 0;
221 		conn->outstanding_requests = 0;
222 		pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE);
223 		STAILQ_INIT(&conn->send_queue);
224 		conn->send_request = NULL;
225 
226 		flag = fcntl(conn->sockfd, F_GETFL);
227 		if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
228 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
229 				    conn->sockfd, spdk_strerror(errno));
230 			close(conn->sockfd);
231 			return -1;
232 		}
233 
234 		TAILQ_REMOVE(&server->free_conns, conn, link);
235 		TAILQ_INSERT_TAIL(&server->conns, conn, link);
236 		return 0;
237 	}
238 
239 	if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
240 		return 0;
241 	}
242 
243 	return -1;
244 }
245 
246 void
247 spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
248 				   const struct spdk_json_val *method, const struct spdk_json_val *params)
249 {
250 	request->conn->server->handle_request(request, method, params);
251 }
252 
253 void
254 spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
255 {
256 	const char *msg;
257 
258 	switch (error) {
259 	case SPDK_JSONRPC_ERROR_PARSE_ERROR:
260 		msg = "Parse error";
261 		break;
262 
263 	case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
264 		msg = "Invalid request";
265 		break;
266 
267 	case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
268 		msg = "Method not found";
269 		break;
270 
271 	case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
272 		msg = "Invalid parameters";
273 		break;
274 
275 	case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
276 		msg = "Internal error";
277 		break;
278 
279 	default:
280 		msg = "Error";
281 		break;
282 	}
283 
284 	spdk_jsonrpc_send_error_response(request, error, msg);
285 }
286 
287 static int
288 spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
289 {
290 	ssize_t rc, offset;
291 	size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
292 
293 	rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
294 	if (rc == -1) {
295 		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
296 			return 0;
297 		}
298 		SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno));
299 		return -1;
300 	}
301 
302 	if (rc == 0) {
303 		SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n");
304 		conn->closed = true;
305 		return 0;
306 	}
307 
308 	conn->recv_len += rc;
309 
310 	offset = 0;
311 	do {
312 		rc = spdk_jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
313 		if (rc < 0) {
314 			SPDK_ERRLOG("jsonrpc parse request failed\n");
315 			return -1;
316 		}
317 
318 		offset += rc;
319 	} while (rc > 0);
320 
321 	if (offset > 0) {
322 		/*
323 		 * Successfully parsed a requests - move any data past the end of the
324 		 * parsed requests down to the beginning.
325 		 */
326 		assert((size_t)offset <= conn->recv_len);
327 		memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
328 		conn->recv_len -= offset;
329 	}
330 
331 	return 0;
332 }
333 
334 void
335 spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
336 {
337 	struct spdk_jsonrpc_server_conn *conn = request->conn;
338 
339 	/* Queue the response to be sent */
340 	pthread_spin_lock(&conn->queue_lock);
341 	STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
342 	pthread_spin_unlock(&conn->queue_lock);
343 }
344 
345 
346 static int
347 spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
348 {
349 	struct spdk_jsonrpc_request *request;
350 	ssize_t rc;
351 
352 more:
353 	if (conn->outstanding_requests == 0) {
354 		return 0;
355 	}
356 
357 	if (conn->send_request == NULL) {
358 		conn->send_request = spdk_jsonrpc_server_dequeue_request(conn);
359 	}
360 
361 	request = conn->send_request;
362 	if (request == NULL) {
363 		/* Nothing to send right now */
364 		return 0;
365 	}
366 
367 	if (request->send_len > 0) {
368 		rc = send(conn->sockfd, request->send_buf + request->send_offset,
369 			  request->send_len, 0);
370 		if (rc < 0) {
371 			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
372 				return 0;
373 			}
374 
375 			SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno));
376 			return -1;
377 		}
378 
379 		request->send_offset += rc;
380 		request->send_len -= rc;
381 	}
382 
383 	if (request->send_len == 0) {
384 		/*
385 		 * Full response has been sent.
386 		 * Free it and set send_request to NULL to move on to the next queued response.
387 		 */
388 		conn->send_request = NULL;
389 		spdk_jsonrpc_free_request(request);
390 		goto more;
391 	}
392 
393 	return 0;
394 }
395 
396 int
397 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
398 {
399 	int rc;
400 	struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
401 
402 	TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
403 		/* If we can't receive and there are no outstanding requests close the connection. */
404 		if (conn->closed == true && conn->outstanding_requests == 0) {
405 			spdk_jsonrpc_server_conn_close(conn);
406 		}
407 
408 		if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
409 			spdk_jsonrpc_server_conn_remove(conn);
410 		}
411 	}
412 
413 	/* Check listen socket */
414 	if (!TAILQ_EMPTY(&server->free_conns)) {
415 		spdk_jsonrpc_server_accept(server);
416 	}
417 
418 	TAILQ_FOREACH(conn, &server->conns, link) {
419 		if (conn->sockfd == -1) {
420 			continue;
421 		}
422 
423 		rc = spdk_jsonrpc_server_conn_send(conn);
424 		if (rc != 0) {
425 			spdk_jsonrpc_server_conn_close(conn);
426 			continue;
427 		}
428 
429 		if (!conn->closed) {
430 			rc = spdk_jsonrpc_server_conn_recv(conn);
431 			if (rc != 0) {
432 				spdk_jsonrpc_server_conn_close(conn);
433 			}
434 		}
435 	}
436 
437 	return 0;
438 }
439