1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "jsonrpc_internal.h" 35 #include "spdk/string.h" 36 #include "spdk/util.h" 37 38 struct spdk_jsonrpc_server * 39 spdk_jsonrpc_server_listen(int domain, int protocol, 40 struct sockaddr *listen_addr, socklen_t addrlen, 41 spdk_jsonrpc_handle_request_fn handle_request) 42 { 43 struct spdk_jsonrpc_server *server; 44 int rc, val, flag, i; 45 46 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 47 if (server == NULL) { 48 return NULL; 49 } 50 51 TAILQ_INIT(&server->free_conns); 52 TAILQ_INIT(&server->conns); 53 54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 56 } 57 58 server->handle_request = handle_request; 59 60 server->sockfd = socket(domain, SOCK_STREAM, protocol); 61 if (server->sockfd < 0) { 62 SPDK_ERRLOG("socket() failed\n"); 63 free(server); 64 return NULL; 65 } 66 67 val = 1; 68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 69 if (protocol == IPPROTO_TCP) { 70 setsockopt(server->sockfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)); 71 } 72 73 flag = fcntl(server->sockfd, F_GETFL); 74 if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 75 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 76 server->sockfd, spdk_strerror(errno)); 77 close(server->sockfd); 78 free(server); 79 return NULL; 80 } 81 82 rc = bind(server->sockfd, listen_addr, addrlen); 83 if (rc != 0) { 84 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 85 close(server->sockfd); 86 free(server); 87 return NULL; 88 } 89 90 rc = listen(server->sockfd, 512); 91 if (rc != 0) { 92 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 93 close(server->sockfd); 94 free(server); 95 return NULL; 96 } 97 98 return server; 99 } 100 101 static struct spdk_jsonrpc_request * 102 spdk_jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 103 { 104 struct spdk_jsonrpc_request *request = NULL; 105 106 pthread_spin_lock(&conn->queue_lock); 107 request = STAILQ_FIRST(&conn->send_queue); 108 if (request) { 109 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 110 } 111 pthread_spin_unlock(&conn->queue_lock); 112 return request; 113 } 114 115 static void 116 spdk_jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn) 117 { 118 struct spdk_jsonrpc_request *request; 119 120 spdk_jsonrpc_free_request(conn->send_request); 121 conn->send_request = NULL ; 122 while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) { 123 spdk_jsonrpc_free_request(request); 124 } 125 } 126 127 static void 128 spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 129 { 130 conn->closed = true; 131 132 if (conn->sockfd >= 0) { 133 spdk_jsonrpc_server_free_conn_request(conn); 134 close(conn->sockfd); 135 conn->sockfd = -1; 136 137 if (conn->close_cb) { 138 conn->close_cb(conn, conn->close_cb_ctx); 139 } 140 } 141 } 142 143 void 144 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 145 { 146 struct spdk_jsonrpc_server_conn *conn; 147 148 close(server->sockfd); 149 150 TAILQ_FOREACH(conn, &server->conns, link) { 151 spdk_jsonrpc_server_conn_close(conn); 152 } 153 154 free(server); 155 } 156 157 static void 158 spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 159 { 160 struct spdk_jsonrpc_server *server = conn->server; 161 162 spdk_jsonrpc_server_conn_close(conn); 163 164 pthread_spin_destroy(&conn->queue_lock); 165 assert(STAILQ_EMPTY(&conn->send_queue)); 166 167 TAILQ_REMOVE(&server->conns, conn, link); 168 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 169 } 170 171 int 172 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 173 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 174 { 175 int rc = 0; 176 177 pthread_spin_lock(&conn->queue_lock); 178 if (conn->close_cb == NULL) { 179 conn->close_cb = cb; 180 conn->close_cb_ctx = ctx; 181 } else { 182 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 183 } 184 pthread_spin_unlock(&conn->queue_lock); 185 186 return rc; 187 } 188 189 int 190 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 191 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 192 { 193 int rc = 0; 194 195 pthread_spin_lock(&conn->queue_lock); 196 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 197 rc = -ENOENT; 198 } else { 199 conn->close_cb = NULL; 200 } 201 pthread_spin_unlock(&conn->queue_lock); 202 203 return rc; 204 } 205 206 static int 207 spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 208 { 209 struct spdk_jsonrpc_server_conn *conn; 210 int rc, flag; 211 212 rc = accept(server->sockfd, NULL, NULL); 213 if (rc >= 0) { 214 conn = TAILQ_FIRST(&server->free_conns); 215 assert(conn != NULL); 216 217 conn->server = server; 218 conn->sockfd = rc; 219 conn->closed = false; 220 conn->recv_len = 0; 221 conn->outstanding_requests = 0; 222 pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE); 223 STAILQ_INIT(&conn->send_queue); 224 conn->send_request = NULL; 225 226 flag = fcntl(conn->sockfd, F_GETFL); 227 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 228 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 229 conn->sockfd, spdk_strerror(errno)); 230 close(conn->sockfd); 231 return -1; 232 } 233 234 TAILQ_REMOVE(&server->free_conns, conn, link); 235 TAILQ_INSERT_TAIL(&server->conns, conn, link); 236 return 0; 237 } 238 239 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 240 return 0; 241 } 242 243 return -1; 244 } 245 246 void 247 spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 248 const struct spdk_json_val *method, const struct spdk_json_val *params) 249 { 250 request->conn->server->handle_request(request, method, params); 251 } 252 253 void 254 spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 255 { 256 const char *msg; 257 258 switch (error) { 259 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 260 msg = "Parse error"; 261 break; 262 263 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 264 msg = "Invalid request"; 265 break; 266 267 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 268 msg = "Method not found"; 269 break; 270 271 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 272 msg = "Invalid parameters"; 273 break; 274 275 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 276 msg = "Internal error"; 277 break; 278 279 default: 280 msg = "Error"; 281 break; 282 } 283 284 spdk_jsonrpc_send_error_response(request, error, msg); 285 } 286 287 static int 288 spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 289 { 290 ssize_t rc, offset; 291 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 292 293 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 294 if (rc == -1) { 295 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 296 return 0; 297 } 298 SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno)); 299 return -1; 300 } 301 302 if (rc == 0) { 303 SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n"); 304 conn->closed = true; 305 return 0; 306 } 307 308 conn->recv_len += rc; 309 310 offset = 0; 311 do { 312 rc = spdk_jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 313 if (rc < 0) { 314 SPDK_ERRLOG("jsonrpc parse request failed\n"); 315 return -1; 316 } 317 318 offset += rc; 319 } while (rc > 0); 320 321 if (offset > 0) { 322 /* 323 * Successfully parsed a requests - move any data past the end of the 324 * parsed requests down to the beginning. 325 */ 326 assert((size_t)offset <= conn->recv_len); 327 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 328 conn->recv_len -= offset; 329 } 330 331 return 0; 332 } 333 334 void 335 spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 336 { 337 struct spdk_jsonrpc_server_conn *conn = request->conn; 338 339 /* Queue the response to be sent */ 340 pthread_spin_lock(&conn->queue_lock); 341 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 342 pthread_spin_unlock(&conn->queue_lock); 343 } 344 345 346 static int 347 spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 348 { 349 struct spdk_jsonrpc_request *request; 350 ssize_t rc; 351 352 more: 353 if (conn->outstanding_requests == 0) { 354 return 0; 355 } 356 357 if (conn->send_request == NULL) { 358 conn->send_request = spdk_jsonrpc_server_dequeue_request(conn); 359 } 360 361 request = conn->send_request; 362 if (request == NULL) { 363 /* Nothing to send right now */ 364 return 0; 365 } 366 367 if (request->send_len > 0) { 368 rc = send(conn->sockfd, request->send_buf + request->send_offset, 369 request->send_len, 0); 370 if (rc < 0) { 371 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 372 return 0; 373 } 374 375 SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno)); 376 return -1; 377 } 378 379 request->send_offset += rc; 380 request->send_len -= rc; 381 } 382 383 if (request->send_len == 0) { 384 /* 385 * Full response has been sent. 386 * Free it and set send_request to NULL to move on to the next queued response. 387 */ 388 conn->send_request = NULL; 389 spdk_jsonrpc_free_request(request); 390 goto more; 391 } 392 393 return 0; 394 } 395 396 int 397 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 398 { 399 int rc; 400 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 401 402 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 403 /* If we can't receive and there are no outstanding requests close the connection. */ 404 if (conn->closed == true && conn->outstanding_requests == 0) { 405 spdk_jsonrpc_server_conn_close(conn); 406 } 407 408 if (conn->sockfd == -1 && conn->outstanding_requests == 0) { 409 spdk_jsonrpc_server_conn_remove(conn); 410 } 411 } 412 413 /* Check listen socket */ 414 if (!TAILQ_EMPTY(&server->free_conns)) { 415 spdk_jsonrpc_server_accept(server); 416 } 417 418 TAILQ_FOREACH(conn, &server->conns, link) { 419 if (conn->sockfd == -1) { 420 continue; 421 } 422 423 rc = spdk_jsonrpc_server_conn_send(conn); 424 if (rc != 0) { 425 spdk_jsonrpc_server_conn_close(conn); 426 continue; 427 } 428 429 if (!conn->closed) { 430 rc = spdk_jsonrpc_server_conn_recv(conn); 431 if (rc != 0) { 432 spdk_jsonrpc_server_conn_close(conn); 433 } 434 } 435 } 436 437 return 0; 438 } 439