1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "jsonrpc_internal.h" 35 #include "spdk/string.h" 36 #include "spdk/util.h" 37 38 struct spdk_jsonrpc_server * 39 spdk_jsonrpc_server_listen(int domain, int protocol, 40 struct sockaddr *listen_addr, socklen_t addrlen, 41 spdk_jsonrpc_handle_request_fn handle_request) 42 { 43 struct spdk_jsonrpc_server *server; 44 int rc, val, flag, i; 45 46 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 47 if (server == NULL) { 48 return NULL; 49 } 50 51 TAILQ_INIT(&server->free_conns); 52 TAILQ_INIT(&server->conns); 53 54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 56 } 57 58 server->handle_request = handle_request; 59 60 server->sockfd = socket(domain, SOCK_STREAM, protocol); 61 if (server->sockfd < 0) { 62 SPDK_ERRLOG("socket() failed\n"); 63 free(server); 64 return NULL; 65 } 66 67 val = 1; 68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 69 70 flag = fcntl(server->sockfd, F_GETFL); 71 if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 72 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 73 server->sockfd, spdk_strerror(errno)); 74 close(server->sockfd); 75 free(server); 76 return NULL; 77 } 78 79 rc = bind(server->sockfd, listen_addr, addrlen); 80 if (rc != 0) { 81 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 82 close(server->sockfd); 83 free(server); 84 return NULL; 85 } 86 87 rc = listen(server->sockfd, 512); 88 if (rc != 0) { 89 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 90 close(server->sockfd); 91 free(server); 92 return NULL; 93 } 94 95 return server; 96 } 97 98 static struct spdk_jsonrpc_request * 99 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 100 { 101 struct spdk_jsonrpc_request *request = NULL; 102 103 pthread_spin_lock(&conn->queue_lock); 104 request = STAILQ_FIRST(&conn->send_queue); 105 if (request) { 106 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 107 } 108 pthread_spin_unlock(&conn->queue_lock); 109 return request; 110 } 111 112 static void 113 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn) 114 { 115 struct spdk_jsonrpc_request *request; 116 117 jsonrpc_free_request(conn->send_request); 118 conn->send_request = NULL ; 119 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) { 120 jsonrpc_free_request(request); 121 } 122 } 123 124 static void 125 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 126 { 127 conn->closed = true; 128 129 if (conn->sockfd >= 0) { 130 jsonrpc_server_free_conn_request(conn); 131 close(conn->sockfd); 132 conn->sockfd = -1; 133 134 if (conn->close_cb) { 135 conn->close_cb(conn, conn->close_cb_ctx); 136 } 137 } 138 } 139 140 void 141 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 142 { 143 struct spdk_jsonrpc_server_conn *conn; 144 145 close(server->sockfd); 146 147 TAILQ_FOREACH(conn, &server->conns, link) { 148 jsonrpc_server_conn_close(conn); 149 } 150 151 free(server); 152 } 153 154 static void 155 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 156 { 157 struct spdk_jsonrpc_server *server = conn->server; 158 159 jsonrpc_server_conn_close(conn); 160 161 pthread_spin_destroy(&conn->queue_lock); 162 assert(STAILQ_EMPTY(&conn->send_queue)); 163 164 TAILQ_REMOVE(&server->conns, conn, link); 165 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 166 } 167 168 int 169 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 170 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 171 { 172 int rc = 0; 173 174 pthread_spin_lock(&conn->queue_lock); 175 if (conn->close_cb == NULL) { 176 conn->close_cb = cb; 177 conn->close_cb_ctx = ctx; 178 } else { 179 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 180 } 181 pthread_spin_unlock(&conn->queue_lock); 182 183 return rc; 184 } 185 186 int 187 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 188 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 189 { 190 int rc = 0; 191 192 pthread_spin_lock(&conn->queue_lock); 193 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 194 rc = -ENOENT; 195 } else { 196 conn->close_cb = NULL; 197 } 198 pthread_spin_unlock(&conn->queue_lock); 199 200 return rc; 201 } 202 203 static int 204 jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 205 { 206 struct spdk_jsonrpc_server_conn *conn; 207 int rc, flag; 208 209 rc = accept(server->sockfd, NULL, NULL); 210 if (rc >= 0) { 211 conn = TAILQ_FIRST(&server->free_conns); 212 assert(conn != NULL); 213 214 conn->server = server; 215 conn->sockfd = rc; 216 conn->closed = false; 217 conn->recv_len = 0; 218 conn->outstanding_requests = 0; 219 pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE); 220 STAILQ_INIT(&conn->send_queue); 221 conn->send_request = NULL; 222 223 flag = fcntl(conn->sockfd, F_GETFL); 224 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 225 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 226 conn->sockfd, spdk_strerror(errno)); 227 close(conn->sockfd); 228 return -1; 229 } 230 231 TAILQ_REMOVE(&server->free_conns, conn, link); 232 TAILQ_INSERT_TAIL(&server->conns, conn, link); 233 return 0; 234 } 235 236 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 237 return 0; 238 } 239 240 return -1; 241 } 242 243 void 244 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 245 const struct spdk_json_val *method, const struct spdk_json_val *params) 246 { 247 request->conn->server->handle_request(request, method, params); 248 } 249 250 void 251 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 252 { 253 const char *msg; 254 255 switch (error) { 256 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 257 msg = "Parse error"; 258 break; 259 260 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 261 msg = "Invalid request"; 262 break; 263 264 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 265 msg = "Method not found"; 266 break; 267 268 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 269 msg = "Invalid parameters"; 270 break; 271 272 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 273 msg = "Internal error"; 274 break; 275 276 default: 277 msg = "Error"; 278 break; 279 } 280 281 spdk_jsonrpc_send_error_response(request, error, msg); 282 } 283 284 static int 285 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 286 { 287 ssize_t rc, offset; 288 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 289 290 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 291 if (rc == -1) { 292 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 293 return 0; 294 } 295 SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno)); 296 return -1; 297 } 298 299 if (rc == 0) { 300 SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n"); 301 conn->closed = true; 302 return 0; 303 } 304 305 conn->recv_len += rc; 306 307 offset = 0; 308 do { 309 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 310 if (rc < 0) { 311 SPDK_ERRLOG("jsonrpc parse request failed\n"); 312 return -1; 313 } 314 315 offset += rc; 316 } while (rc > 0); 317 318 if (offset > 0) { 319 /* 320 * Successfully parsed a requests - move any data past the end of the 321 * parsed requests down to the beginning. 322 */ 323 assert((size_t)offset <= conn->recv_len); 324 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 325 conn->recv_len -= offset; 326 } 327 328 return 0; 329 } 330 331 void 332 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 333 { 334 struct spdk_jsonrpc_server_conn *conn = request->conn; 335 336 /* Queue the response to be sent */ 337 pthread_spin_lock(&conn->queue_lock); 338 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 339 pthread_spin_unlock(&conn->queue_lock); 340 } 341 342 343 static int 344 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 345 { 346 struct spdk_jsonrpc_request *request; 347 ssize_t rc; 348 349 more: 350 if (conn->outstanding_requests == 0) { 351 return 0; 352 } 353 354 if (conn->send_request == NULL) { 355 conn->send_request = jsonrpc_server_dequeue_request(conn); 356 } 357 358 request = conn->send_request; 359 if (request == NULL) { 360 /* Nothing to send right now */ 361 return 0; 362 } 363 364 if (request->send_len > 0) { 365 rc = send(conn->sockfd, request->send_buf + request->send_offset, 366 request->send_len, 0); 367 if (rc < 0) { 368 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 369 return 0; 370 } 371 372 SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno)); 373 return -1; 374 } 375 376 request->send_offset += rc; 377 request->send_len -= rc; 378 } 379 380 if (request->send_len == 0) { 381 /* 382 * Full response has been sent. 383 * Free it and set send_request to NULL to move on to the next queued response. 384 */ 385 conn->send_request = NULL; 386 jsonrpc_free_request(request); 387 goto more; 388 } 389 390 return 0; 391 } 392 393 int 394 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 395 { 396 int rc; 397 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 398 399 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 400 /* If we can't receive and there are no outstanding requests close the connection. */ 401 if (conn->closed == true && conn->outstanding_requests == 0) { 402 jsonrpc_server_conn_close(conn); 403 } 404 405 if (conn->sockfd == -1 && conn->outstanding_requests == 0) { 406 jsonrpc_server_conn_remove(conn); 407 } 408 } 409 410 /* Check listen socket */ 411 if (!TAILQ_EMPTY(&server->free_conns)) { 412 jsonrpc_server_accept(server); 413 } 414 415 TAILQ_FOREACH(conn, &server->conns, link) { 416 if (conn->sockfd == -1) { 417 continue; 418 } 419 420 rc = jsonrpc_server_conn_send(conn); 421 if (rc != 0) { 422 jsonrpc_server_conn_close(conn); 423 continue; 424 } 425 426 if (!conn->closed) { 427 rc = jsonrpc_server_conn_recv(conn); 428 if (rc != 0) { 429 jsonrpc_server_conn_close(conn); 430 } 431 } 432 } 433 434 return 0; 435 } 436