xref: /spdk/lib/jsonrpc/jsonrpc_server_tcp.c (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1  /*-
2   *   BSD LICENSE
3   *
4   *   Copyright (c) Intel Corporation.
5   *   All rights reserved.
6   *
7   *   Redistribution and use in source and binary forms, with or without
8   *   modification, are permitted provided that the following conditions
9   *   are met:
10   *
11   *     * Redistributions of source code must retain the above copyright
12   *       notice, this list of conditions and the following disclaimer.
13   *     * Redistributions in binary form must reproduce the above copyright
14   *       notice, this list of conditions and the following disclaimer in
15   *       the documentation and/or other materials provided with the
16   *       distribution.
17   *     * Neither the name of Intel Corporation nor the names of its
18   *       contributors may be used to endorse or promote products derived
19   *       from this software without specific prior written permission.
20   *
21   *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22   *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23   *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24   *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25   *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26   *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27   *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28   *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29   *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30   *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31   *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32   */
33  
34  #include "jsonrpc_internal.h"
35  #include "spdk/string.h"
36  #include "spdk/util.h"
37  
38  struct spdk_jsonrpc_server *
39  spdk_jsonrpc_server_listen(int domain, int protocol,
40  			   struct sockaddr *listen_addr, socklen_t addrlen,
41  			   spdk_jsonrpc_handle_request_fn handle_request)
42  {
43  	struct spdk_jsonrpc_server *server;
44  	int rc, val, flag, i;
45  
46  	server = calloc(1, sizeof(struct spdk_jsonrpc_server));
47  	if (server == NULL) {
48  		return NULL;
49  	}
50  
51  	TAILQ_INIT(&server->free_conns);
52  	TAILQ_INIT(&server->conns);
53  
54  	for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
55  		TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
56  	}
57  
58  	server->handle_request = handle_request;
59  
60  	server->sockfd = socket(domain, SOCK_STREAM, protocol);
61  	if (server->sockfd < 0) {
62  		SPDK_ERRLOG("socket() failed\n");
63  		free(server);
64  		return NULL;
65  	}
66  
67  	val = 1;
68  	setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
69  
70  	flag = fcntl(server->sockfd, F_GETFL);
71  	if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
72  		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
73  			    server->sockfd, spdk_strerror(errno));
74  		close(server->sockfd);
75  		free(server);
76  		return NULL;
77  	}
78  
79  	rc = bind(server->sockfd, listen_addr, addrlen);
80  	if (rc != 0) {
81  		SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
82  		close(server->sockfd);
83  		free(server);
84  		return NULL;
85  	}
86  
87  	rc = listen(server->sockfd, 512);
88  	if (rc != 0) {
89  		SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
90  		close(server->sockfd);
91  		free(server);
92  		return NULL;
93  	}
94  
95  	return server;
96  }
97  
98  static struct spdk_jsonrpc_request *
99  jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
100  {
101  	struct spdk_jsonrpc_request *request = NULL;
102  
103  	pthread_spin_lock(&conn->queue_lock);
104  	request = STAILQ_FIRST(&conn->send_queue);
105  	if (request) {
106  		STAILQ_REMOVE_HEAD(&conn->send_queue, link);
107  	}
108  	pthread_spin_unlock(&conn->queue_lock);
109  	return request;
110  }
111  
112  static void
113  jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
114  {
115  	struct spdk_jsonrpc_request *request;
116  
117  	jsonrpc_free_request(conn->send_request);
118  	conn->send_request = NULL ;
119  	while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) {
120  		jsonrpc_free_request(request);
121  	}
122  }
123  
124  static void
125  jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
126  {
127  	conn->closed = true;
128  
129  	if (conn->sockfd >= 0) {
130  		jsonrpc_server_free_conn_request(conn);
131  		close(conn->sockfd);
132  		conn->sockfd = -1;
133  
134  		if (conn->close_cb) {
135  			conn->close_cb(conn, conn->close_cb_ctx);
136  		}
137  	}
138  }
139  
140  void
141  spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
142  {
143  	struct spdk_jsonrpc_server_conn *conn;
144  
145  	close(server->sockfd);
146  
147  	TAILQ_FOREACH(conn, &server->conns, link) {
148  		jsonrpc_server_conn_close(conn);
149  	}
150  
151  	free(server);
152  }
153  
154  static void
155  jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
156  {
157  	struct spdk_jsonrpc_server *server = conn->server;
158  
159  	jsonrpc_server_conn_close(conn);
160  
161  	pthread_spin_destroy(&conn->queue_lock);
162  	assert(STAILQ_EMPTY(&conn->send_queue));
163  
164  	TAILQ_REMOVE(&server->conns, conn, link);
165  	TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
166  }
167  
168  int
169  spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
170  			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
171  {
172  	int rc = 0;
173  
174  	pthread_spin_lock(&conn->queue_lock);
175  	if (conn->close_cb == NULL) {
176  		conn->close_cb = cb;
177  		conn->close_cb_ctx = ctx;
178  	} else {
179  		rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
180  	}
181  	pthread_spin_unlock(&conn->queue_lock);
182  
183  	return rc;
184  }
185  
186  int
187  spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
188  			       spdk_jsonrpc_conn_closed_fn cb, void *ctx)
189  {
190  	int rc = 0;
191  
192  	pthread_spin_lock(&conn->queue_lock);
193  	if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
194  		rc = -ENOENT;
195  	} else {
196  		conn->close_cb = NULL;
197  	}
198  	pthread_spin_unlock(&conn->queue_lock);
199  
200  	return rc;
201  }
202  
203  static int
204  jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
205  {
206  	struct spdk_jsonrpc_server_conn *conn;
207  	int rc, flag;
208  
209  	rc = accept(server->sockfd, NULL, NULL);
210  	if (rc >= 0) {
211  		conn = TAILQ_FIRST(&server->free_conns);
212  		assert(conn != NULL);
213  
214  		conn->server = server;
215  		conn->sockfd = rc;
216  		conn->closed = false;
217  		conn->recv_len = 0;
218  		conn->outstanding_requests = 0;
219  		STAILQ_INIT(&conn->send_queue);
220  		conn->send_request = NULL;
221  
222  		if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) {
223  			SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd);
224  			close(conn->sockfd);
225  			return -1;
226  		}
227  
228  		flag = fcntl(conn->sockfd, F_GETFL);
229  		if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
230  			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
231  				    conn->sockfd, spdk_strerror(errno));
232  			close(conn->sockfd);
233  			pthread_spin_destroy(&conn->queue_lock);
234  			return -1;
235  		}
236  
237  		TAILQ_REMOVE(&server->free_conns, conn, link);
238  		TAILQ_INSERT_TAIL(&server->conns, conn, link);
239  		return 0;
240  	}
241  
242  	if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
243  		return 0;
244  	}
245  
246  	return -1;
247  }
248  
249  void
250  jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
251  			      const struct spdk_json_val *method, const struct spdk_json_val *params)
252  {
253  	request->conn->server->handle_request(request, method, params);
254  }
255  
256  void
257  jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
258  {
259  	const char *msg;
260  
261  	switch (error) {
262  	case SPDK_JSONRPC_ERROR_PARSE_ERROR:
263  		msg = "Parse error";
264  		break;
265  
266  	case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
267  		msg = "Invalid request";
268  		break;
269  
270  	case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
271  		msg = "Method not found";
272  		break;
273  
274  	case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
275  		msg = "Invalid parameters";
276  		break;
277  
278  	case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
279  		msg = "Internal error";
280  		break;
281  
282  	default:
283  		msg = "Error";
284  		break;
285  	}
286  
287  	spdk_jsonrpc_send_error_response(request, error, msg);
288  }
289  
290  static int
291  jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
292  {
293  	ssize_t rc, offset;
294  	size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
295  
296  	rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
297  	if (rc == -1) {
298  		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
299  			return 0;
300  		}
301  		SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno));
302  		return -1;
303  	}
304  
305  	if (rc == 0) {
306  		SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n");
307  		conn->closed = true;
308  		return 0;
309  	}
310  
311  	conn->recv_len += rc;
312  
313  	offset = 0;
314  	do {
315  		rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
316  		if (rc < 0) {
317  			SPDK_ERRLOG("jsonrpc parse request failed\n");
318  			return -1;
319  		}
320  
321  		offset += rc;
322  	} while (rc > 0);
323  
324  	if (offset > 0) {
325  		/*
326  		 * Successfully parsed a requests - move any data past the end of the
327  		 * parsed requests down to the beginning.
328  		 */
329  		assert((size_t)offset <= conn->recv_len);
330  		memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
331  		conn->recv_len -= offset;
332  	}
333  
334  	return 0;
335  }
336  
337  void
338  jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
339  {
340  	struct spdk_jsonrpc_server_conn *conn = request->conn;
341  
342  	/* Queue the response to be sent */
343  	pthread_spin_lock(&conn->queue_lock);
344  	STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
345  	pthread_spin_unlock(&conn->queue_lock);
346  }
347  
348  
349  static int
350  jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
351  {
352  	struct spdk_jsonrpc_request *request;
353  	ssize_t rc;
354  
355  more:
356  	if (conn->outstanding_requests == 0) {
357  		return 0;
358  	}
359  
360  	if (conn->send_request == NULL) {
361  		conn->send_request = jsonrpc_server_dequeue_request(conn);
362  	}
363  
364  	request = conn->send_request;
365  	if (request == NULL) {
366  		/* Nothing to send right now */
367  		return 0;
368  	}
369  
370  	if (request->send_len > 0) {
371  		rc = send(conn->sockfd, request->send_buf + request->send_offset,
372  			  request->send_len, 0);
373  		if (rc < 0) {
374  			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
375  				return 0;
376  			}
377  
378  			SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno));
379  			return -1;
380  		}
381  
382  		request->send_offset += rc;
383  		request->send_len -= rc;
384  	}
385  
386  	if (request->send_len == 0) {
387  		/*
388  		 * Full response has been sent.
389  		 * Free it and set send_request to NULL to move on to the next queued response.
390  		 */
391  		conn->send_request = NULL;
392  		jsonrpc_free_request(request);
393  		goto more;
394  	}
395  
396  	return 0;
397  }
398  
399  int
400  spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
401  {
402  	int rc;
403  	struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
404  
405  	TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
406  		/* If we can't receive and there are no outstanding requests close the connection. */
407  		if (conn->closed == true && conn->outstanding_requests == 0) {
408  			jsonrpc_server_conn_close(conn);
409  		}
410  
411  		if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
412  			jsonrpc_server_conn_remove(conn);
413  		}
414  	}
415  
416  	/* Check listen socket */
417  	if (!TAILQ_EMPTY(&server->free_conns)) {
418  		jsonrpc_server_accept(server);
419  	}
420  
421  	TAILQ_FOREACH(conn, &server->conns, link) {
422  		if (conn->sockfd == -1) {
423  			continue;
424  		}
425  
426  		rc = jsonrpc_server_conn_send(conn);
427  		if (rc != 0) {
428  			jsonrpc_server_conn_close(conn);
429  			continue;
430  		}
431  
432  		if (!conn->closed) {
433  			rc = jsonrpc_server_conn_recv(conn);
434  			if (rc != 0) {
435  				jsonrpc_server_conn_close(conn);
436  			}
437  		}
438  	}
439  
440  	return 0;
441  }
442