1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "jsonrpc_internal.h" 35 #include "spdk/string.h" 36 #include "spdk/util.h" 37 38 struct spdk_jsonrpc_server * 39 spdk_jsonrpc_server_listen(int domain, int protocol, 40 struct sockaddr *listen_addr, socklen_t addrlen, 41 spdk_jsonrpc_handle_request_fn handle_request) 42 { 43 struct spdk_jsonrpc_server *server; 44 int rc, val, i; 45 46 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 47 if (server == NULL) { 48 return NULL; 49 } 50 51 TAILQ_INIT(&server->free_conns); 52 TAILQ_INIT(&server->conns); 53 54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 56 } 57 58 server->handle_request = handle_request; 59 60 server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK, protocol); 61 if (server->sockfd < 0) { 62 SPDK_ERRLOG("socket() failed\n"); 63 free(server); 64 return NULL; 65 } 66 67 val = 1; 68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 69 70 rc = bind(server->sockfd, listen_addr, addrlen); 71 if (rc != 0) { 72 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 73 close(server->sockfd); 74 free(server); 75 return NULL; 76 } 77 78 rc = listen(server->sockfd, 512); 79 if (rc != 0) { 80 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 81 close(server->sockfd); 82 free(server); 83 return NULL; 84 } 85 86 return server; 87 } 88 89 static struct spdk_jsonrpc_request * 90 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 91 { 92 struct spdk_jsonrpc_request *request = NULL; 93 94 pthread_spin_lock(&conn->queue_lock); 95 request = STAILQ_FIRST(&conn->send_queue); 96 if (request) { 97 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 98 } 99 pthread_spin_unlock(&conn->queue_lock); 100 return request; 101 } 102 103 static void 104 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn) 105 { 106 struct spdk_jsonrpc_request *request; 107 108 jsonrpc_free_request(conn->send_request); 109 conn->send_request = NULL ; 110 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) { 111 jsonrpc_free_request(request); 112 } 113 } 114 115 static void 116 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 117 { 118 conn->closed = true; 119 120 if (conn->sockfd >= 0) { 121 jsonrpc_server_free_conn_request(conn); 122 close(conn->sockfd); 123 conn->sockfd = -1; 124 125 if (conn->close_cb) { 126 conn->close_cb(conn, conn->close_cb_ctx); 127 } 128 } 129 } 130 131 void 132 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 133 { 134 struct spdk_jsonrpc_server_conn *conn; 135 136 close(server->sockfd); 137 138 TAILQ_FOREACH(conn, &server->conns, link) { 139 jsonrpc_server_conn_close(conn); 140 } 141 142 free(server); 143 } 144 145 static void 146 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 147 { 148 struct spdk_jsonrpc_server *server = conn->server; 149 150 jsonrpc_server_conn_close(conn); 151 152 pthread_spin_destroy(&conn->queue_lock); 153 assert(STAILQ_EMPTY(&conn->send_queue)); 154 155 TAILQ_REMOVE(&server->conns, conn, link); 156 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 157 } 158 159 int 160 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 161 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 162 { 163 int rc = 0; 164 165 pthread_spin_lock(&conn->queue_lock); 166 if (conn->close_cb == NULL) { 167 conn->close_cb = cb; 168 conn->close_cb_ctx = ctx; 169 } else { 170 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 171 } 172 pthread_spin_unlock(&conn->queue_lock); 173 174 return rc; 175 } 176 177 int 178 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 179 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 180 { 181 int rc = 0; 182 183 pthread_spin_lock(&conn->queue_lock); 184 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 185 rc = -ENOENT; 186 } else { 187 conn->close_cb = NULL; 188 } 189 pthread_spin_unlock(&conn->queue_lock); 190 191 return rc; 192 } 193 194 static int 195 jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 196 { 197 struct spdk_jsonrpc_server_conn *conn; 198 int rc, flag; 199 200 rc = accept(server->sockfd, NULL, NULL); 201 if (rc >= 0) { 202 conn = TAILQ_FIRST(&server->free_conns); 203 assert(conn != NULL); 204 205 conn->server = server; 206 conn->sockfd = rc; 207 conn->closed = false; 208 conn->recv_len = 0; 209 conn->outstanding_requests = 0; 210 STAILQ_INIT(&conn->send_queue); 211 conn->send_request = NULL; 212 213 if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) { 214 SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd); 215 close(conn->sockfd); 216 return -1; 217 } 218 219 flag = fcntl(conn->sockfd, F_GETFL); 220 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 221 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 222 conn->sockfd, spdk_strerror(errno)); 223 close(conn->sockfd); 224 pthread_spin_destroy(&conn->queue_lock); 225 return -1; 226 } 227 228 TAILQ_REMOVE(&server->free_conns, conn, link); 229 TAILQ_INSERT_TAIL(&server->conns, conn, link); 230 return 0; 231 } 232 233 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 234 return 0; 235 } 236 237 return -1; 238 } 239 240 void 241 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 242 const struct spdk_json_val *method, const struct spdk_json_val *params) 243 { 244 request->conn->server->handle_request(request, method, params); 245 } 246 247 void 248 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 249 { 250 const char *msg; 251 252 switch (error) { 253 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 254 msg = "Parse error"; 255 break; 256 257 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 258 msg = "Invalid request"; 259 break; 260 261 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 262 msg = "Method not found"; 263 break; 264 265 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 266 msg = "Invalid parameters"; 267 break; 268 269 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 270 msg = "Internal error"; 271 break; 272 273 default: 274 msg = "Error"; 275 break; 276 } 277 278 spdk_jsonrpc_send_error_response(request, error, msg); 279 } 280 281 static int 282 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 283 { 284 ssize_t rc, offset; 285 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 286 287 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 288 if (rc == -1) { 289 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 290 return 0; 291 } 292 SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno)); 293 return -1; 294 } 295 296 if (rc == 0) { 297 SPDK_DEBUGLOG(rpc, "remote closed connection\n"); 298 conn->closed = true; 299 return 0; 300 } 301 302 conn->recv_len += rc; 303 304 offset = 0; 305 do { 306 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 307 if (rc < 0) { 308 SPDK_ERRLOG("jsonrpc parse request failed\n"); 309 return -1; 310 } 311 312 offset += rc; 313 } while (rc > 0); 314 315 if (offset > 0) { 316 /* 317 * Successfully parsed a requests - move any data past the end of the 318 * parsed requests down to the beginning. 319 */ 320 assert((size_t)offset <= conn->recv_len); 321 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 322 conn->recv_len -= offset; 323 } 324 325 return 0; 326 } 327 328 void 329 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 330 { 331 struct spdk_jsonrpc_server_conn *conn = request->conn; 332 333 /* Queue the response to be sent */ 334 pthread_spin_lock(&conn->queue_lock); 335 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 336 pthread_spin_unlock(&conn->queue_lock); 337 } 338 339 340 static int 341 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 342 { 343 struct spdk_jsonrpc_request *request; 344 ssize_t rc; 345 346 more: 347 if (conn->outstanding_requests == 0) { 348 return 0; 349 } 350 351 if (conn->send_request == NULL) { 352 conn->send_request = jsonrpc_server_dequeue_request(conn); 353 } 354 355 request = conn->send_request; 356 if (request == NULL) { 357 /* Nothing to send right now */ 358 return 0; 359 } 360 361 if (request->send_len > 0) { 362 rc = send(conn->sockfd, request->send_buf + request->send_offset, 363 request->send_len, 0); 364 if (rc < 0) { 365 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 366 return 0; 367 } 368 369 SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno)); 370 return -1; 371 } 372 373 request->send_offset += rc; 374 request->send_len -= rc; 375 } 376 377 if (request->send_len == 0) { 378 /* 379 * Full response has been sent. 380 * Free it and set send_request to NULL to move on to the next queued response. 381 */ 382 conn->send_request = NULL; 383 jsonrpc_free_request(request); 384 goto more; 385 } 386 387 return 0; 388 } 389 390 int 391 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 392 { 393 int rc; 394 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 395 396 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 397 /* If we can't receive and there are no outstanding requests close the connection. */ 398 if (conn->closed == true && conn->outstanding_requests == 0) { 399 jsonrpc_server_conn_close(conn); 400 } 401 402 if (conn->sockfd == -1 && conn->outstanding_requests == 0) { 403 jsonrpc_server_conn_remove(conn); 404 } 405 } 406 407 /* Check listen socket */ 408 if (!TAILQ_EMPTY(&server->free_conns)) { 409 jsonrpc_server_accept(server); 410 } 411 412 TAILQ_FOREACH(conn, &server->conns, link) { 413 if (conn->sockfd == -1) { 414 continue; 415 } 416 417 rc = jsonrpc_server_conn_send(conn); 418 if (rc != 0) { 419 jsonrpc_server_conn_close(conn); 420 continue; 421 } 422 423 if (!conn->closed) { 424 rc = jsonrpc_server_conn_recv(conn); 425 if (rc != 0) { 426 jsonrpc_server_conn_close(conn); 427 } 428 } 429 } 430 431 return 0; 432 } 433