1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "jsonrpc_internal.h" 35 #include "spdk/string.h" 36 #include "spdk/util.h" 37 38 struct spdk_jsonrpc_server * 39 spdk_jsonrpc_server_listen(int domain, int protocol, 40 struct sockaddr *listen_addr, socklen_t addrlen, 41 spdk_jsonrpc_handle_request_fn handle_request) 42 { 43 struct spdk_jsonrpc_server *server; 44 int rc, val, flag, i; 45 46 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 47 if (server == NULL) { 48 return NULL; 49 } 50 51 TAILQ_INIT(&server->free_conns); 52 TAILQ_INIT(&server->conns); 53 54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 56 } 57 58 server->handle_request = handle_request; 59 60 server->sockfd = socket(domain, SOCK_STREAM, protocol); 61 if (server->sockfd < 0) { 62 SPDK_ERRLOG("socket() failed\n"); 63 free(server); 64 return NULL; 65 } 66 67 val = 1; 68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 69 if (protocol == IPPROTO_TCP) { 70 setsockopt(server->sockfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)); 71 } 72 73 flag = fcntl(server->sockfd, F_GETFL); 74 if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 75 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 76 server->sockfd, spdk_strerror(errno)); 77 close(server->sockfd); 78 free(server); 79 return NULL; 80 } 81 82 rc = bind(server->sockfd, listen_addr, addrlen); 83 if (rc != 0) { 84 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 85 close(server->sockfd); 86 free(server); 87 return NULL; 88 } 89 90 rc = listen(server->sockfd, 512); 91 if (rc != 0) { 92 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 93 close(server->sockfd); 94 free(server); 95 return NULL; 96 } 97 98 return server; 99 } 100 101 static void 102 spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 103 { 104 conn->closed = true; 105 106 if (conn->sockfd >= 0) { 107 close(conn->sockfd); 108 conn->sockfd = -1; 109 110 if (conn->close_cb) { 111 conn->close_cb(conn, conn->close_cb_ctx); 112 } 113 } 114 } 115 116 void 117 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 118 { 119 struct spdk_jsonrpc_server_conn *conn; 120 121 close(server->sockfd); 122 123 TAILQ_FOREACH(conn, &server->conns, link) { 124 spdk_jsonrpc_server_conn_close(conn); 125 } 126 127 free(server); 128 } 129 130 static void 131 spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 132 { 133 struct spdk_jsonrpc_server *server = conn->server; 134 135 spdk_jsonrpc_server_conn_close(conn); 136 137 pthread_spin_destroy(&conn->queue_lock); 138 assert(STAILQ_EMPTY(&conn->send_queue)); 139 140 TAILQ_REMOVE(&server->conns, conn, link); 141 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 142 } 143 144 int 145 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 146 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 147 { 148 int rc = 0; 149 150 pthread_spin_lock(&conn->queue_lock); 151 if (conn->close_cb == NULL) { 152 conn->close_cb = cb; 153 conn->close_cb_ctx = ctx; 154 } else { 155 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 156 } 157 pthread_spin_unlock(&conn->queue_lock); 158 159 return rc; 160 } 161 162 int 163 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 164 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 165 { 166 int rc = 0; 167 168 pthread_spin_lock(&conn->queue_lock); 169 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 170 rc = -ENOENT; 171 } else { 172 conn->close_cb = NULL; 173 } 174 pthread_spin_unlock(&conn->queue_lock); 175 176 return rc; 177 } 178 179 static int 180 spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 181 { 182 struct spdk_jsonrpc_server_conn *conn; 183 int rc, flag; 184 185 rc = accept(server->sockfd, NULL, NULL); 186 if (rc >= 0) { 187 conn = TAILQ_FIRST(&server->free_conns); 188 assert(conn != NULL); 189 190 conn->server = server; 191 conn->sockfd = rc; 192 conn->closed = false; 193 conn->recv_len = 0; 194 conn->outstanding_requests = 0; 195 pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE); 196 STAILQ_INIT(&conn->send_queue); 197 conn->send_request = NULL; 198 199 flag = fcntl(conn->sockfd, F_GETFL); 200 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 201 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 202 conn->sockfd, spdk_strerror(errno)); 203 close(conn->sockfd); 204 return -1; 205 } 206 207 TAILQ_REMOVE(&server->free_conns, conn, link); 208 TAILQ_INSERT_TAIL(&server->conns, conn, link); 209 return 0; 210 } 211 212 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 213 return 0; 214 } 215 216 return -1; 217 } 218 219 void 220 spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 221 const struct spdk_json_val *method, const struct spdk_json_val *params) 222 { 223 request->conn->server->handle_request(request, method, params); 224 } 225 226 void 227 spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 228 { 229 const char *msg; 230 231 switch (error) { 232 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 233 msg = "Parse error"; 234 break; 235 236 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 237 msg = "Invalid request"; 238 break; 239 240 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 241 msg = "Method not found"; 242 break; 243 244 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 245 msg = "Invalid parameters"; 246 break; 247 248 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 249 msg = "Internal error"; 250 break; 251 252 default: 253 msg = "Error"; 254 break; 255 } 256 257 spdk_jsonrpc_send_error_response(request, error, msg); 258 } 259 260 static int 261 spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 262 { 263 ssize_t rc, offset; 264 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 265 266 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 267 if (rc == -1) { 268 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 269 return 0; 270 } 271 SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno)); 272 return -1; 273 } 274 275 if (rc == 0) { 276 SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n"); 277 conn->closed = true; 278 return 0; 279 } 280 281 conn->recv_len += rc; 282 283 offset = 0; 284 do { 285 rc = spdk_jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 286 if (rc < 0) { 287 SPDK_ERRLOG("jsonrpc parse request failed\n"); 288 return -1; 289 } 290 291 offset += rc; 292 } while (rc > 0); 293 294 if (offset > 0) { 295 /* 296 * Successfully parsed a requests - move any data past the end of the 297 * parsed requests down to the beginning. 298 */ 299 assert((size_t)offset <= conn->recv_len); 300 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 301 conn->recv_len -= offset; 302 } 303 304 return 0; 305 } 306 307 void 308 spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 309 { 310 struct spdk_jsonrpc_server_conn *conn = request->conn; 311 312 /* Queue the response to be sent */ 313 pthread_spin_lock(&conn->queue_lock); 314 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 315 pthread_spin_unlock(&conn->queue_lock); 316 } 317 318 static struct spdk_jsonrpc_request * 319 spdk_jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 320 { 321 struct spdk_jsonrpc_request *request = NULL; 322 323 pthread_spin_lock(&conn->queue_lock); 324 request = STAILQ_FIRST(&conn->send_queue); 325 if (request) { 326 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 327 } 328 pthread_spin_unlock(&conn->queue_lock); 329 return request; 330 } 331 332 333 static int 334 spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 335 { 336 struct spdk_jsonrpc_request *request; 337 ssize_t rc; 338 339 more: 340 if (conn->outstanding_requests == 0) { 341 return 0; 342 } 343 344 if (conn->send_request == NULL) { 345 conn->send_request = spdk_jsonrpc_server_dequeue_request(conn); 346 } 347 348 request = conn->send_request; 349 if (request == NULL) { 350 /* Nothing to send right now */ 351 return 0; 352 } 353 354 if (request->send_len > 0) { 355 rc = send(conn->sockfd, request->send_buf + request->send_offset, 356 request->send_len, 0); 357 if (rc < 0) { 358 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 359 return 0; 360 } 361 362 SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno)); 363 return -1; 364 } 365 366 request->send_offset += rc; 367 request->send_len -= rc; 368 } 369 370 if (request->send_len == 0) { 371 /* 372 * Full response has been sent. 373 * Free it and set send_request to NULL to move on to the next queued response. 374 */ 375 conn->send_request = NULL; 376 spdk_jsonrpc_free_request(request); 377 goto more; 378 } 379 380 return 0; 381 } 382 383 int 384 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 385 { 386 int rc; 387 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 388 389 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 390 /* If we can't receive and there are no outstanding requests close the connection. */ 391 if (conn->closed == true && conn->outstanding_requests == 0) { 392 spdk_jsonrpc_server_conn_close(conn); 393 } 394 395 if (conn->sockfd == -1) { 396 struct spdk_jsonrpc_request *request; 397 398 /* 399 * The client closed the connection, but there may still be requests 400 * outstanding; we have no way to cancel outstanding requests, so wait until 401 * each outstanding request sends a response (which will be discarded, since 402 * the connection is closed). 403 */ 404 405 spdk_jsonrpc_free_request(conn->send_request); 406 conn->send_request = NULL; 407 408 while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) { 409 spdk_jsonrpc_free_request(request); 410 } 411 412 if (conn->outstanding_requests == 0) { 413 SPDK_DEBUGLOG(SPDK_LOG_RPC, "all outstanding requests completed\n"); 414 spdk_jsonrpc_server_conn_remove(conn); 415 } 416 } 417 } 418 419 /* Check listen socket */ 420 if (!TAILQ_EMPTY(&server->free_conns)) { 421 spdk_jsonrpc_server_accept(server); 422 } 423 424 TAILQ_FOREACH(conn, &server->conns, link) { 425 if (conn->sockfd == -1) { 426 continue; 427 } 428 429 rc = spdk_jsonrpc_server_conn_send(conn); 430 if (rc != 0) { 431 spdk_jsonrpc_server_conn_close(conn); 432 continue; 433 } 434 435 if (!conn->closed) { 436 rc = spdk_jsonrpc_server_conn_recv(conn); 437 if (rc != 0) { 438 spdk_jsonrpc_server_conn_close(conn); 439 } 440 } 441 } 442 443 return 0; 444 } 445