1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "jsonrpc_internal.h" 7 #include "spdk/string.h" 8 #include "spdk/util.h" 9 10 struct spdk_jsonrpc_server * 11 spdk_jsonrpc_server_listen(int domain, int protocol, 12 struct sockaddr *listen_addr, socklen_t addrlen, 13 spdk_jsonrpc_handle_request_fn handle_request) 14 { 15 struct spdk_jsonrpc_server *server; 16 int rc, val, i; 17 18 server = calloc(1, sizeof(struct spdk_jsonrpc_server)); 19 if (server == NULL) { 20 return NULL; 21 } 22 23 TAILQ_INIT(&server->free_conns); 24 TAILQ_INIT(&server->conns); 25 26 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) { 27 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link); 28 } 29 30 server->handle_request = handle_request; 31 32 server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol); 33 if (server->sockfd < 0) { 34 SPDK_ERRLOG("socket() failed\n"); 35 free(server); 36 return NULL; 37 } 38 39 val = 1; 40 rc = setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); 41 if (rc != 0) { 42 SPDK_ERRLOG("could not set SO_REUSEADDR sock option: %s\n", spdk_strerror(errno)); 43 close(server->sockfd); 44 free(server); 45 return NULL; 46 } 47 48 rc = bind(server->sockfd, listen_addr, addrlen); 49 if (rc != 0) { 50 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno)); 51 close(server->sockfd); 52 free(server); 53 return NULL; 54 } 55 56 rc = listen(server->sockfd, 512); 57 if (rc != 0) { 58 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 59 close(server->sockfd); 60 free(server); 61 return NULL; 62 } 63 64 return server; 65 } 66 67 static struct spdk_jsonrpc_request * 68 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn) 69 { 70 struct spdk_jsonrpc_request *request = NULL; 71 72 pthread_spin_lock(&conn->queue_lock); 73 request = STAILQ_FIRST(&conn->send_queue); 74 if (request) { 75 STAILQ_REMOVE_HEAD(&conn->send_queue, link); 76 } 77 pthread_spin_unlock(&conn->queue_lock); 78 return request; 79 } 80 81 static void 82 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn) 83 { 84 struct spdk_jsonrpc_request *request; 85 86 jsonrpc_free_request(conn->send_request); 87 conn->send_request = NULL ; 88 89 pthread_spin_lock(&conn->queue_lock); 90 /* There might still be some requests being processed. 91 * We need to tell them that this connection is closed. */ 92 STAILQ_FOREACH(request, &conn->outstanding_queue, link) { 93 request->conn = NULL; 94 } 95 pthread_spin_unlock(&conn->queue_lock); 96 97 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) { 98 jsonrpc_free_request(request); 99 } 100 } 101 102 static void 103 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) 104 { 105 conn->closed = true; 106 107 if (conn->sockfd >= 0) { 108 jsonrpc_server_free_conn_request(conn); 109 close(conn->sockfd); 110 conn->sockfd = -1; 111 112 if (conn->close_cb) { 113 conn->close_cb(conn, conn->close_cb_ctx); 114 } 115 } 116 } 117 118 void 119 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server) 120 { 121 struct spdk_jsonrpc_server_conn *conn; 122 123 close(server->sockfd); 124 125 TAILQ_FOREACH(conn, &server->conns, link) { 126 jsonrpc_server_conn_close(conn); 127 } 128 129 free(server); 130 } 131 132 static void 133 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) 134 { 135 struct spdk_jsonrpc_server *server = conn->server; 136 137 jsonrpc_server_conn_close(conn); 138 139 pthread_spin_destroy(&conn->queue_lock); 140 assert(STAILQ_EMPTY(&conn->send_queue)); 141 142 TAILQ_REMOVE(&server->conns, conn, link); 143 TAILQ_INSERT_HEAD(&server->free_conns, conn, link); 144 } 145 146 int 147 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn, 148 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 149 { 150 int rc = 0; 151 152 pthread_spin_lock(&conn->queue_lock); 153 if (conn->close_cb == NULL) { 154 conn->close_cb = cb; 155 conn->close_cb_ctx = ctx; 156 } else { 157 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC; 158 } 159 pthread_spin_unlock(&conn->queue_lock); 160 161 return rc; 162 } 163 164 int 165 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn, 166 spdk_jsonrpc_conn_closed_fn cb, void *ctx) 167 { 168 int rc = 0; 169 170 pthread_spin_lock(&conn->queue_lock); 171 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) { 172 rc = -ENOENT; 173 } else { 174 conn->close_cb = NULL; 175 } 176 pthread_spin_unlock(&conn->queue_lock); 177 178 return rc; 179 } 180 181 static int 182 jsonrpc_server_accept(struct spdk_jsonrpc_server *server) 183 { 184 struct spdk_jsonrpc_server_conn *conn; 185 int rc, flag; 186 187 rc = accept(server->sockfd, NULL, NULL); 188 if (rc >= 0) { 189 conn = TAILQ_FIRST(&server->free_conns); 190 assert(conn != NULL); 191 192 conn->server = server; 193 conn->sockfd = rc; 194 conn->closed = false; 195 conn->recv_len = 0; 196 conn->outstanding_requests = 0; 197 STAILQ_INIT(&conn->send_queue); 198 STAILQ_INIT(&conn->outstanding_queue); 199 conn->send_request = NULL; 200 201 if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) { 202 SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd); 203 close(conn->sockfd); 204 return -1; 205 } 206 207 flag = fcntl(conn->sockfd, F_GETFL); 208 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) { 209 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 210 conn->sockfd, spdk_strerror(errno)); 211 close(conn->sockfd); 212 pthread_spin_destroy(&conn->queue_lock); 213 return -1; 214 } 215 216 TAILQ_REMOVE(&server->free_conns, conn, link); 217 TAILQ_INSERT_TAIL(&server->conns, conn, link); 218 return 0; 219 } 220 221 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 222 return 0; 223 } 224 225 return -1; 226 } 227 228 void 229 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request, 230 const struct spdk_json_val *method, const struct spdk_json_val *params) 231 { 232 request->conn->server->handle_request(request, method, params); 233 } 234 235 void 236 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error) 237 { 238 const char *msg; 239 240 switch (error) { 241 case SPDK_JSONRPC_ERROR_PARSE_ERROR: 242 msg = "Parse error"; 243 break; 244 245 case SPDK_JSONRPC_ERROR_INVALID_REQUEST: 246 msg = "Invalid request"; 247 break; 248 249 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND: 250 msg = "Method not found"; 251 break; 252 253 case SPDK_JSONRPC_ERROR_INVALID_PARAMS: 254 msg = "Invalid parameters"; 255 break; 256 257 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR: 258 msg = "Internal error"; 259 break; 260 261 default: 262 msg = "Error"; 263 break; 264 } 265 266 spdk_jsonrpc_send_error_response(request, error, msg); 267 } 268 269 static int 270 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn) 271 { 272 ssize_t rc, offset; 273 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len; 274 275 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0); 276 if (rc == -1) { 277 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 278 return 0; 279 } 280 SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno)); 281 return -1; 282 } 283 284 if (rc == 0) { 285 SPDK_DEBUGLOG(rpc, "remote closed connection\n"); 286 conn->closed = true; 287 return 0; 288 } 289 290 conn->recv_len += rc; 291 292 offset = 0; 293 do { 294 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset); 295 if (rc < 0) { 296 SPDK_ERRLOG("jsonrpc parse request failed\n"); 297 return -1; 298 } 299 300 offset += rc; 301 } while (rc > 0); 302 303 if (offset > 0) { 304 /* 305 * Successfully parsed a requests - move any data past the end of the 306 * parsed requests down to the beginning. 307 */ 308 assert((size_t)offset <= conn->recv_len); 309 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset); 310 conn->recv_len -= offset; 311 } 312 313 return 0; 314 } 315 316 void 317 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request) 318 { 319 struct spdk_jsonrpc_server_conn *conn = request->conn; 320 321 if (conn == NULL) { 322 /* We cannot respond to the request, because the connection is closed. */ 323 SPDK_WARNLOG("Unable to send response: connection closed.\n"); 324 jsonrpc_free_request(request); 325 return; 326 } 327 328 /* Queue the response to be sent */ 329 pthread_spin_lock(&conn->queue_lock); 330 STAILQ_REMOVE(&conn->outstanding_queue, request, spdk_jsonrpc_request, link); 331 STAILQ_INSERT_TAIL(&conn->send_queue, request, link); 332 pthread_spin_unlock(&conn->queue_lock); 333 } 334 335 336 static int 337 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn) 338 { 339 struct spdk_jsonrpc_request *request; 340 ssize_t rc; 341 342 more: 343 if (conn->outstanding_requests == 0) { 344 return 0; 345 } 346 347 if (conn->send_request == NULL) { 348 conn->send_request = jsonrpc_server_dequeue_request(conn); 349 } 350 351 request = conn->send_request; 352 if (request == NULL) { 353 /* Nothing to send right now */ 354 return 0; 355 } 356 357 if (request->send_offset == 0) { 358 /* A byte for the null terminator is included in the send buffer. */ 359 request->send_buf[request->send_len] = '\0'; 360 } 361 362 if (request->send_len > 0) { 363 rc = send(conn->sockfd, request->send_buf + request->send_offset, 364 request->send_len, 0); 365 if (rc < 0) { 366 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 367 return 0; 368 } 369 370 SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno)); 371 return -1; 372 } 373 374 request->send_offset += rc; 375 request->send_len -= rc; 376 } 377 378 if (request->send_len == 0) { 379 /* 380 * Full response has been sent. 381 * Free it and set send_request to NULL to move on to the next queued response. 382 */ 383 conn->send_request = NULL; 384 jsonrpc_complete_request(request); 385 goto more; 386 } 387 388 return 0; 389 } 390 391 int 392 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server) 393 { 394 int rc; 395 struct spdk_jsonrpc_server_conn *conn, *conn_tmp; 396 397 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) { 398 /* If we can't receive and there are no outstanding requests close the connection. */ 399 if (conn->closed == true && conn->outstanding_requests == 0) { 400 jsonrpc_server_conn_close(conn); 401 } 402 403 if (conn->sockfd == -1 && conn->outstanding_requests == 0) { 404 jsonrpc_server_conn_remove(conn); 405 } 406 } 407 408 /* Check listen socket */ 409 if (!TAILQ_EMPTY(&server->free_conns)) { 410 jsonrpc_server_accept(server); 411 } 412 413 TAILQ_FOREACH(conn, &server->conns, link) { 414 if (conn->sockfd == -1) { 415 continue; 416 } 417 418 rc = jsonrpc_server_conn_send(conn); 419 if (rc != 0) { 420 jsonrpc_server_conn_close(conn); 421 continue; 422 } 423 424 if (!conn->closed) { 425 rc = jsonrpc_server_conn_recv(conn); 426 if (rc != 0) { 427 jsonrpc_server_conn_close(conn); 428 } 429 } 430 } 431 432 return 0; 433 } 434