1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (C) 2016 Intel Corporation.
3 * All rights reserved.
4 */
5
6 #include "jsonrpc_internal.h"
7 #include "spdk/string.h"
8 #include "spdk/util.h"
9
10 struct spdk_jsonrpc_server *
spdk_jsonrpc_server_listen(int domain,int protocol,struct sockaddr * listen_addr,socklen_t addrlen,spdk_jsonrpc_handle_request_fn handle_request)11 spdk_jsonrpc_server_listen(int domain, int protocol,
12 struct sockaddr *listen_addr, socklen_t addrlen,
13 spdk_jsonrpc_handle_request_fn handle_request)
14 {
15 struct spdk_jsonrpc_server *server;
16 int rc, val, i;
17
18 server = calloc(1, sizeof(struct spdk_jsonrpc_server));
19 if (server == NULL) {
20 return NULL;
21 }
22
23 TAILQ_INIT(&server->free_conns);
24 TAILQ_INIT(&server->conns);
25
26 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
27 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
28 }
29
30 server->handle_request = handle_request;
31
32 server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
33 if (server->sockfd < 0) {
34 SPDK_ERRLOG("socket() failed\n");
35 free(server);
36 return NULL;
37 }
38
39 val = 1;
40 rc = setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
41 if (rc != 0) {
42 SPDK_ERRLOG("could not set SO_REUSEADDR sock option: %s\n", spdk_strerror(errno));
43 close(server->sockfd);
44 free(server);
45 return NULL;
46 }
47
48 rc = bind(server->sockfd, listen_addr, addrlen);
49 if (rc != 0) {
50 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
51 close(server->sockfd);
52 free(server);
53 return NULL;
54 }
55
56 rc = listen(server->sockfd, 512);
57 if (rc != 0) {
58 SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
59 close(server->sockfd);
60 free(server);
61 return NULL;
62 }
63
64 return server;
65 }
66
67 static struct spdk_jsonrpc_request *
jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn * conn)68 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
69 {
70 struct spdk_jsonrpc_request *request = NULL;
71
72 pthread_spin_lock(&conn->queue_lock);
73 request = STAILQ_FIRST(&conn->send_queue);
74 if (request) {
75 STAILQ_REMOVE_HEAD(&conn->send_queue, link);
76 }
77 pthread_spin_unlock(&conn->queue_lock);
78 return request;
79 }
80
81 static void
jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn * conn)82 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
83 {
84 struct spdk_jsonrpc_request *request;
85
86 jsonrpc_free_request(conn->send_request);
87 conn->send_request = NULL ;
88
89 pthread_spin_lock(&conn->queue_lock);
90 /* There might still be some requests being processed.
91 * We need to tell them that this connection is closed. */
92 STAILQ_FOREACH(request, &conn->outstanding_queue, link) {
93 request->conn = NULL;
94 }
95 pthread_spin_unlock(&conn->queue_lock);
96
97 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) {
98 jsonrpc_free_request(request);
99 }
100 }
101
102 static void
jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn * conn)103 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
104 {
105 conn->closed = true;
106
107 if (conn->sockfd >= 0) {
108 jsonrpc_server_free_conn_request(conn);
109 close(conn->sockfd);
110 conn->sockfd = -1;
111
112 if (conn->close_cb) {
113 conn->close_cb(conn, conn->close_cb_ctx);
114 }
115 }
116 }
117
118 void
spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server * server)119 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
120 {
121 struct spdk_jsonrpc_server_conn *conn;
122
123 close(server->sockfd);
124
125 TAILQ_FOREACH(conn, &server->conns, link) {
126 jsonrpc_server_conn_close(conn);
127 }
128
129 free(server);
130 }
131
132 static void
jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn * conn)133 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
134 {
135 struct spdk_jsonrpc_server *server = conn->server;
136
137 jsonrpc_server_conn_close(conn);
138
139 pthread_spin_destroy(&conn->queue_lock);
140 assert(STAILQ_EMPTY(&conn->send_queue));
141
142 TAILQ_REMOVE(&server->conns, conn, link);
143 TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
144 }
145
146 int
spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn * conn,spdk_jsonrpc_conn_closed_fn cb,void * ctx)147 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
148 spdk_jsonrpc_conn_closed_fn cb, void *ctx)
149 {
150 int rc = 0;
151
152 pthread_spin_lock(&conn->queue_lock);
153 if (conn->close_cb == NULL) {
154 conn->close_cb = cb;
155 conn->close_cb_ctx = ctx;
156 } else {
157 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
158 }
159 pthread_spin_unlock(&conn->queue_lock);
160
161 return rc;
162 }
163
164 int
spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn * conn,spdk_jsonrpc_conn_closed_fn cb,void * ctx)165 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
166 spdk_jsonrpc_conn_closed_fn cb, void *ctx)
167 {
168 int rc = 0;
169
170 pthread_spin_lock(&conn->queue_lock);
171 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
172 rc = -ENOENT;
173 } else {
174 conn->close_cb = NULL;
175 }
176 pthread_spin_unlock(&conn->queue_lock);
177
178 return rc;
179 }
180
181 static int
jsonrpc_server_accept(struct spdk_jsonrpc_server * server)182 jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
183 {
184 struct spdk_jsonrpc_server_conn *conn;
185 int rc, flag;
186
187 rc = accept(server->sockfd, NULL, NULL);
188 if (rc >= 0) {
189 conn = TAILQ_FIRST(&server->free_conns);
190 assert(conn != NULL);
191
192 conn->server = server;
193 conn->sockfd = rc;
194 conn->closed = false;
195 conn->recv_len = 0;
196 conn->outstanding_requests = 0;
197 STAILQ_INIT(&conn->send_queue);
198 STAILQ_INIT(&conn->outstanding_queue);
199 conn->send_request = NULL;
200
201 if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) {
202 SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd);
203 close(conn->sockfd);
204 return -1;
205 }
206
207 flag = fcntl(conn->sockfd, F_GETFL);
208 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
209 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
210 conn->sockfd, spdk_strerror(errno));
211 close(conn->sockfd);
212 pthread_spin_destroy(&conn->queue_lock);
213 return -1;
214 }
215
216 TAILQ_REMOVE(&server->free_conns, conn, link);
217 TAILQ_INSERT_TAIL(&server->conns, conn, link);
218 return 0;
219 }
220
221 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
222 return 0;
223 }
224
225 return -1;
226 }
227
228 void
jsonrpc_server_handle_request(struct spdk_jsonrpc_request * request,const struct spdk_json_val * method,const struct spdk_json_val * params)229 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
230 const struct spdk_json_val *method, const struct spdk_json_val *params)
231 {
232 request->conn->server->handle_request(request, method, params);
233 }
234
235 void
jsonrpc_server_handle_error(struct spdk_jsonrpc_request * request,int error)236 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
237 {
238 const char *msg;
239
240 switch (error) {
241 case SPDK_JSONRPC_ERROR_PARSE_ERROR:
242 msg = "Parse error";
243 break;
244
245 case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
246 msg = "Invalid request";
247 break;
248
249 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
250 msg = "Method not found";
251 break;
252
253 case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
254 msg = "Invalid parameters";
255 break;
256
257 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
258 msg = "Internal error";
259 break;
260
261 default:
262 msg = "Error";
263 break;
264 }
265
266 spdk_jsonrpc_send_error_response(request, error, msg);
267 }
268
269 static int
jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn * conn)270 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
271 {
272 ssize_t rc, offset;
273 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
274
275 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
276 if (rc == -1) {
277 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
278 return 0;
279 }
280 SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno));
281 return -1;
282 }
283
284 if (rc == 0) {
285 SPDK_DEBUGLOG(rpc, "remote closed connection\n");
286 conn->closed = true;
287 return 0;
288 }
289
290 conn->recv_len += rc;
291
292 offset = 0;
293 do {
294 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
295 if (rc < 0) {
296 SPDK_ERRLOG("jsonrpc parse request failed\n");
297 return -1;
298 }
299
300 offset += rc;
301 } while (rc > 0);
302
303 if (offset > 0) {
304 /*
305 * Successfully parsed a requests - move any data past the end of the
306 * parsed requests down to the beginning.
307 */
308 assert((size_t)offset <= conn->recv_len);
309 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
310 conn->recv_len -= offset;
311 }
312
313 return 0;
314 }
315
316 void
jsonrpc_server_send_response(struct spdk_jsonrpc_request * request)317 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
318 {
319 struct spdk_jsonrpc_server_conn *conn = request->conn;
320
321 if (conn == NULL) {
322 /* We cannot respond to the request, because the connection is closed. */
323 SPDK_WARNLOG("Unable to send response: connection closed.\n");
324 jsonrpc_free_request(request);
325 return;
326 }
327
328 /* Queue the response to be sent */
329 pthread_spin_lock(&conn->queue_lock);
330 STAILQ_REMOVE(&conn->outstanding_queue, request, spdk_jsonrpc_request, link);
331 STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
332 pthread_spin_unlock(&conn->queue_lock);
333 }
334
335
336 static int
jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn * conn)337 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
338 {
339 struct spdk_jsonrpc_request *request;
340 ssize_t rc;
341
342 more:
343 if (conn->outstanding_requests == 0) {
344 return 0;
345 }
346
347 if (conn->send_request == NULL) {
348 conn->send_request = jsonrpc_server_dequeue_request(conn);
349 }
350
351 request = conn->send_request;
352 if (request == NULL) {
353 /* Nothing to send right now */
354 return 0;
355 }
356
357 if (request->send_offset == 0) {
358 /* A byte for the null terminator is included in the send buffer. */
359 request->send_buf[request->send_len] = '\0';
360 }
361
362 if (request->send_len > 0) {
363 rc = send(conn->sockfd, request->send_buf + request->send_offset,
364 request->send_len, 0);
365 if (rc < 0) {
366 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
367 return 0;
368 }
369
370 SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno));
371 return -1;
372 }
373
374 request->send_offset += rc;
375 request->send_len -= rc;
376 }
377
378 if (request->send_len == 0) {
379 /*
380 * Full response has been sent.
381 * Free it and set send_request to NULL to move on to the next queued response.
382 */
383 conn->send_request = NULL;
384 jsonrpc_complete_request(request);
385 goto more;
386 }
387
388 return 0;
389 }
390
391 int
spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server * server)392 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
393 {
394 int rc;
395 struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
396
397 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
398 /* If we can't receive and there are no outstanding requests close the connection. */
399 if (conn->closed == true && conn->outstanding_requests == 0) {
400 jsonrpc_server_conn_close(conn);
401 }
402
403 if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
404 jsonrpc_server_conn_remove(conn);
405 }
406 }
407
408 /* Check listen socket */
409 if (!TAILQ_EMPTY(&server->free_conns)) {
410 jsonrpc_server_accept(server);
411 }
412
413 TAILQ_FOREACH(conn, &server->conns, link) {
414 if (conn->sockfd == -1) {
415 continue;
416 }
417
418 rc = jsonrpc_server_conn_send(conn);
419 if (rc != 0) {
420 jsonrpc_server_conn_close(conn);
421 continue;
422 }
423
424 if (!conn->closed) {
425 rc = jsonrpc_server_conn_recv(conn);
426 if (rc != 0) {
427 jsonrpc_server_conn_close(conn);
428 }
429 }
430 }
431
432 return 0;
433 }
434