1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "jsonrpc_internal.h" 7 #include "spdk/string.h" 8 #include "spdk/util.h" 9 10 struct spdk_jsonrpc_server * 11 spdk_jsonrpc_server_listen(int domain, int protocol, 12 struct sockaddr *listen_addr, socklen_t addrlen, 13 spdk_jsonrpc_handle_request_fn handle_request) 14 { 15 struct spdk_jsonrpc_server *server; 16 int rc, val, i; 17 18 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 19 if (server == NULL) { 20 return NULL; 21 } 22 23 TAILQ_INIT(&server->free_conns); 24 TAILQ_INIT(&server->conns); 25 26 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 27 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 28 } 29 30 server->handle_request = handle_request; 31 32 server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol); 33 if (server->sockfd < 0) { 34 SPDK_ERRLOG("socket() failed\n"); 35 free(server); 36 return NULL; 37 } 38 39 val = 1; 40 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 41 42 rc = bind(server->sockfd, listen_addr, addrlen); 43 if (rc != 0) { 44 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 45 close(server->sockfd); 46 free(server); 47 return NULL; 48 } 49 50 rc = listen(server->sockfd, 512); 51 if (rc != 0) { 52 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 53 close(server->sockfd); 54 free(server); 55 return NULL; 56 } 57 58 return server; 59 } 60 61 static struct spdk_jsonrpc_request * 62 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 63 { 64 struct spdk_jsonrpc_request *request = NULL; 65 66 pthread_spin_lock(&conn->queue_lock); 67 request = STAILQ_FIRST(&conn->send_queue); 68 if (request) { 69 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 70 } 71 pthread_spin_unlock(&conn->queue_lock); 72 return request; 73 } 74 75 static void 76 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn) 77 { 78 struct spdk_jsonrpc_request *request; 79 80 jsonrpc_free_request(conn->send_request); 81 conn->send_request = NULL ; 82 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) { 83 jsonrpc_free_request(request); 84 } 85 } 86 87 static void 88 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 89 { 90 conn->closed = true; 91 92 if (conn->sockfd >= 0) { 93 jsonrpc_server_free_conn_request(conn); 94 close(conn->sockfd); 95 conn->sockfd = -1; 96 97 if (conn->close_cb) { 98 conn->close_cb(conn, conn->close_cb_ctx); 99 } 100 } 101 } 102 103 void 104 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 105 { 106 struct spdk_jsonrpc_server_conn *conn; 107 108 close(server->sockfd); 109 110 TAILQ_FOREACH(conn, &server->conns, link) { 111 jsonrpc_server_conn_close(conn); 112 } 113 114 free(server); 115 } 116 117 static void 118 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 119 { 120 struct spdk_jsonrpc_server *server = conn->server; 121 122 jsonrpc_server_conn_close(conn); 123 124 pthread_spin_destroy(&conn->queue_lock); 125 assert(STAILQ_EMPTY(&conn->send_queue)); 126 127 TAILQ_REMOVE(&server->conns, conn, link); 128 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 129 } 130 131 int 132 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 133 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 134 { 135 int rc = 0; 136 137 pthread_spin_lock(&conn->queue_lock); 138 if (conn->close_cb == NULL) { 139 conn->close_cb = cb; 140 conn->close_cb_ctx = ctx; 141 } else { 142 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 143 } 144 pthread_spin_unlock(&conn->queue_lock); 145 146 return rc; 147 } 148 149 int 150 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 151 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 152 { 153 int rc = 0; 154 155 pthread_spin_lock(&conn->queue_lock); 156 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 157 rc = -ENOENT; 158 } else { 159 conn->close_cb = NULL; 160 } 161 pthread_spin_unlock(&conn->queue_lock); 162 163 return rc; 164 } 165 166 static int 167 jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 168 { 169 struct spdk_jsonrpc_server_conn *conn; 170 int rc, flag; 171 172 rc = accept(server->sockfd, NULL, NULL); 173 if (rc >= 0) { 174 conn = TAILQ_FIRST(&server->free_conns); 175 assert(conn != NULL); 176 177 conn->server = server; 178 conn->sockfd = rc; 179 conn->closed = false; 180 conn->recv_len = 0; 181 conn->outstanding_requests = 0; 182 STAILQ_INIT(&conn->send_queue); 183 conn->send_request = NULL; 184 185 if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) { 186 SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd); 187 close(conn->sockfd); 188 return -1; 189 } 190 191 flag = fcntl(conn->sockfd, F_GETFL); 192 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 193 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 194 conn->sockfd, spdk_strerror(errno)); 195 close(conn->sockfd); 196 pthread_spin_destroy(&conn->queue_lock); 197 return -1; 198 } 199 200 TAILQ_REMOVE(&server->free_conns, conn, link); 201 TAILQ_INSERT_TAIL(&server->conns, conn, link); 202 return 0; 203 } 204 205 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 206 return 0; 207 } 208 209 return -1; 210 } 211 212 void 213 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 214 const struct spdk_json_val *method, const struct spdk_json_val *params) 215 { 216 request->conn->server->handle_request(request, method, params); 217 } 218 219 void 220 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 221 { 222 const char *msg; 223 224 switch (error) { 225 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 226 msg = "Parse error"; 227 break; 228 229 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 230 msg = "Invalid request"; 231 break; 232 233 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 234 msg = "Method not found"; 235 break; 236 237 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 238 msg = "Invalid parameters"; 239 break; 240 241 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 242 msg = "Internal error"; 243 break; 244 245 default: 246 msg = "Error"; 247 break; 248 } 249 250 spdk_jsonrpc_send_error_response(request, error, msg); 251 } 252 253 static int 254 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 255 { 256 ssize_t rc, offset; 257 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 258 259 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 260 if (rc == -1) { 261 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 262 return 0; 263 } 264 SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno)); 265 return -1; 266 } 267 268 if (rc == 0) { 269 SPDK_DEBUGLOG(rpc, "remote closed connection\n"); 270 conn->closed = true; 271 return 0; 272 } 273 274 conn->recv_len += rc; 275 276 offset = 0; 277 do { 278 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 279 if (rc < 0) { 280 SPDK_ERRLOG("jsonrpc parse request failed\n"); 281 return -1; 282 } 283 284 offset += rc; 285 } while (rc > 0); 286 287 if (offset > 0) { 288 /* 289 * Successfully parsed a requests - move any data past the end of the 290 * parsed requests down to the beginning. 291 */ 292 assert((size_t)offset <= conn->recv_len); 293 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 294 conn->recv_len -= offset; 295 } 296 297 return 0; 298 } 299 300 void 301 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 302 { 303 struct spdk_jsonrpc_server_conn *conn = request->conn; 304 305 /* Queue the response to be sent */ 306 pthread_spin_lock(&conn->queue_lock); 307 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 308 pthread_spin_unlock(&conn->queue_lock); 309 } 310 311 312 static int 313 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 314 { 315 struct spdk_jsonrpc_request *request; 316 ssize_t rc; 317 318 more: 319 if (conn->outstanding_requests == 0) { 320 return 0; 321 } 322 323 if (conn->send_request == NULL) { 324 conn->send_request = jsonrpc_server_dequeue_request(conn); 325 } 326 327 request = conn->send_request; 328 if (request == NULL) { 329 /* Nothing to send right now */ 330 return 0; 331 } 332 333 if (request->send_len > 0) { 334 rc = send(conn->sockfd, request->send_buf + request->send_offset, 335 request->send_len, 0); 336 if (rc < 0) { 337 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 338 return 0; 339 } 340 341 SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno)); 342 return -1; 343 } 344 345 request->send_offset += rc; 346 request->send_len -= rc; 347 } 348 349 if (request->send_len == 0) { 350 /* 351 * Full response has been sent. 352 * Free it and set send_request to NULL to move on to the next queued response. 353 */ 354 conn->send_request = NULL; 355 jsonrpc_free_request(request); 356 goto more; 357 } 358 359 return 0; 360 } 361 362 int 363 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 364 { 365 int rc; 366 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 367 368 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 369 /* If we can't receive and there are no outstanding requests close the connection. */ 370 if (conn->closed == true && conn->outstanding_requests == 0) { 371 jsonrpc_server_conn_close(conn); 372 } 373 374 if (conn->sockfd == -1 && conn->outstanding_requests == 0) { 375 jsonrpc_server_conn_remove(conn); 376 } 377 } 378 379 /* Check listen socket */ 380 if (!TAILQ_EMPTY(&server->free_conns)) { 381 jsonrpc_server_accept(server); 382 } 383 384 TAILQ_FOREACH(conn, &server->conns, link) { 385 if (conn->sockfd == -1) { 386 continue; 387 } 388 389 rc = jsonrpc_server_conn_send(conn); 390 if (rc != 0) { 391 jsonrpc_server_conn_close(conn); 392 continue; 393 } 394 395 if (!conn->closed) { 396 rc = jsonrpc_server_conn_recv(conn); 397 if (rc != 0) { 398 jsonrpc_server_conn_close(conn); 399 } 400 } 401 } 402 403 return 0; 404 } 405