1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "jsonrpc_internal.h" 35 #include "spdk/string.h" 36 #include "spdk/util.h" 37 38 struct spdk_jsonrpc_server * 39 spdk_jsonrpc_server_listen(int domain, int protocol, 40 struct sockaddr *listen_addr, socklen_t addrlen, 41 spdk_jsonrpc_handle_request_fn handle_request) 42 { 43 struct spdk_jsonrpc_server *server; 44 int rc, val, flag, i; 45 46 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 47 if (server == NULL) { 48 return NULL; 49 } 50 51 TAILQ_INIT(&server->free_conns); 52 TAILQ_INIT(&server->conns); 53 54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 56 } 57 58 server->handle_request = handle_request; 59 60 server->sockfd = socket(domain, SOCK_STREAM, protocol); 61 if (server->sockfd < 0) { 62 SPDK_ERRLOG("socket() failed\n"); 63 free(server); 64 return NULL; 65 } 66 67 val = 1; 68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 69 70 flag = fcntl(server->sockfd, F_GETFL); 71 if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 72 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 73 server->sockfd, spdk_strerror(errno)); 74 close(server->sockfd); 75 free(server); 76 return NULL; 77 } 78 79 rc = bind(server->sockfd, listen_addr, addrlen); 80 if (rc != 0) { 81 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 82 close(server->sockfd); 83 free(server); 84 return NULL; 85 } 86 87 rc = listen(server->sockfd, 512); 88 if (rc != 0) { 89 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 90 close(server->sockfd); 91 free(server); 92 return NULL; 93 } 94 95 return server; 96 } 97 98 static struct spdk_jsonrpc_request * 99 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 100 { 101 struct spdk_jsonrpc_request *request = NULL; 102 103 pthread_spin_lock(&conn->queue_lock); 104 request = STAILQ_FIRST(&conn->send_queue); 105 if (request) { 106 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 107 } 108 pthread_spin_unlock(&conn->queue_lock); 109 return request; 110 } 111 112 static void 113 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn) 114 { 115 struct spdk_jsonrpc_request *request; 116 117 jsonrpc_free_request(conn->send_request); 118 conn->send_request = NULL ; 119 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) { 120 jsonrpc_free_request(request); 121 } 122 } 123 124 static void 125 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 126 { 127 conn->closed = true; 128 129 if (conn->sockfd >= 0) { 130 jsonrpc_server_free_conn_request(conn); 131 close(conn->sockfd); 132 conn->sockfd = -1; 133 134 if (conn->close_cb) { 135 conn->close_cb(conn, conn->close_cb_ctx); 136 } 137 } 138 } 139 140 void 141 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 142 { 143 struct spdk_jsonrpc_server_conn *conn; 144 145 close(server->sockfd); 146 147 TAILQ_FOREACH(conn, &server->conns, link) { 148 jsonrpc_server_conn_close(conn); 149 } 150 151 free(server); 152 } 153 154 static void 155 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 156 { 157 struct spdk_jsonrpc_server *server = conn->server; 158 159 jsonrpc_server_conn_close(conn); 160 161 pthread_spin_destroy(&conn->queue_lock); 162 assert(STAILQ_EMPTY(&conn->send_queue)); 163 164 TAILQ_REMOVE(&server->conns, conn, link); 165 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 166 } 167 168 int 169 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 170 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 171 { 172 int rc = 0; 173 174 pthread_spin_lock(&conn->queue_lock); 175 if (conn->close_cb == NULL) { 176 conn->close_cb = cb; 177 conn->close_cb_ctx = ctx; 178 } else { 179 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 180 } 181 pthread_spin_unlock(&conn->queue_lock); 182 183 return rc; 184 } 185 186 int 187 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 188 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 189 { 190 int rc = 0; 191 192 pthread_spin_lock(&conn->queue_lock); 193 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 194 rc = -ENOENT; 195 } else { 196 conn->close_cb = NULL; 197 } 198 pthread_spin_unlock(&conn->queue_lock); 199 200 return rc; 201 } 202 203 static int 204 jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 205 { 206 struct spdk_jsonrpc_server_conn *conn; 207 int rc, flag; 208 209 rc = accept(server->sockfd, NULL, NULL); 210 if (rc >= 0) { 211 conn = TAILQ_FIRST(&server->free_conns); 212 assert(conn != NULL); 213 214 conn->server = server; 215 conn->sockfd = rc; 216 conn->closed = false; 217 conn->recv_len = 0; 218 conn->outstanding_requests = 0; 219 STAILQ_INIT(&conn->send_queue); 220 conn->send_request = NULL; 221 222 if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) { 223 SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd); 224 close(conn->sockfd); 225 return -1; 226 } 227 228 flag = fcntl(conn->sockfd, F_GETFL); 229 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 230 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 231 conn->sockfd, spdk_strerror(errno)); 232 close(conn->sockfd); 233 pthread_spin_destroy(&conn->queue_lock); 234 return -1; 235 } 236 237 TAILQ_REMOVE(&server->free_conns, conn, link); 238 TAILQ_INSERT_TAIL(&server->conns, conn, link); 239 return 0; 240 } 241 242 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 243 return 0; 244 } 245 246 return -1; 247 } 248 249 void 250 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 251 const struct spdk_json_val *method, const struct spdk_json_val *params) 252 { 253 request->conn->server->handle_request(request, method, params); 254 } 255 256 void 257 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 258 { 259 const char *msg; 260 261 switch (error) { 262 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 263 msg = "Parse error"; 264 break; 265 266 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 267 msg = "Invalid request"; 268 break; 269 270 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 271 msg = "Method not found"; 272 break; 273 274 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 275 msg = "Invalid parameters"; 276 break; 277 278 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 279 msg = "Internal error"; 280 break; 281 282 default: 283 msg = "Error"; 284 break; 285 } 286 287 spdk_jsonrpc_send_error_response(request, error, msg); 288 } 289 290 static int 291 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 292 { 293 ssize_t rc, offset; 294 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 295 296 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 297 if (rc == -1) { 298 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 299 return 0; 300 } 301 SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno)); 302 return -1; 303 } 304 305 if (rc == 0) { 306 SPDK_DEBUGLOG(rpc, "remote closed connection\n"); 307 conn->closed = true; 308 return 0; 309 } 310 311 conn->recv_len += rc; 312 313 offset = 0; 314 do { 315 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 316 if (rc < 0) { 317 SPDK_ERRLOG("jsonrpc parse request failed\n"); 318 return -1; 319 } 320 321 offset += rc; 322 } while (rc > 0); 323 324 if (offset > 0) { 325 /* 326 * Successfully parsed a requests - move any data past the end of the 327 * parsed requests down to the beginning. 328 */ 329 assert((size_t)offset <= conn->recv_len); 330 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 331 conn->recv_len -= offset; 332 } 333 334 return 0; 335 } 336 337 void 338 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 339 { 340 struct spdk_jsonrpc_server_conn *conn = request->conn; 341 342 /* Queue the response to be sent */ 343 pthread_spin_lock(&conn->queue_lock); 344 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 345 pthread_spin_unlock(&conn->queue_lock); 346 } 347 348 349 static int 350 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 351 { 352 struct spdk_jsonrpc_request *request; 353 ssize_t rc; 354 355 more: 356 if (conn->outstanding_requests == 0) { 357 return 0; 358 } 359 360 if (conn->send_request == NULL) { 361 conn->send_request = jsonrpc_server_dequeue_request(conn); 362 } 363 364 request = conn->send_request; 365 if (request == NULL) { 366 /* Nothing to send right now */ 367 return 0; 368 } 369 370 if (request->send_len > 0) { 371 rc = send(conn->sockfd, request->send_buf + request->send_offset, 372 request->send_len, 0); 373 if (rc < 0) { 374 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 375 return 0; 376 } 377 378 SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno)); 379 return -1; 380 } 381 382 request->send_offset += rc; 383 request->send_len -= rc; 384 } 385 386 if (request->send_len == 0) { 387 /* 388 * Full response has been sent. 389 * Free it and set send_request to NULL to move on to the next queued response. 390 */ 391 conn->send_request = NULL; 392 jsonrpc_free_request(request); 393 goto more; 394 } 395 396 return 0; 397 } 398 399 int 400 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 401 { 402 int rc; 403 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 404 405 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 406 /* If we can't receive and there are no outstanding requests close the connection. */ 407 if (conn->closed == true && conn->outstanding_requests == 0) { 408 jsonrpc_server_conn_close(conn); 409 } 410 411 if (conn->sockfd == -1 && conn->outstanding_requests == 0) { 412 jsonrpc_server_conn_remove(conn); 413 } 414 } 415 416 /* Check listen socket */ 417 if (!TAILQ_EMPTY(&server->free_conns)) { 418 jsonrpc_server_accept(server); 419 } 420 421 TAILQ_FOREACH(conn, &server->conns, link) { 422 if (conn->sockfd == -1) { 423 continue; 424 } 425 426 rc = jsonrpc_server_conn_send(conn); 427 if (rc != 0) { 428 jsonrpc_server_conn_close(conn); 429 continue; 430 } 431 432 if (!conn->closed) { 433 rc = jsonrpc_server_conn_recv(conn); 434 if (rc != 0) { 435 jsonrpc_server_conn_close(conn); 436 } 437 } 438 } 439 440 return 0; 441 } 442