1 /* $NetBSD: evrpc.c,v 1.5 2021/04/07 03:36:48 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 5 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 #include "event2/event-config.h" 30 #include <sys/cdefs.h> 31 __RCSID("$NetBSD: evrpc.c,v 1.5 2021/04/07 03:36:48 christos Exp $"); 32 #include "evconfig-private.h" 33 34 #ifdef _WIN32 35 #define WIN32_LEAN_AND_MEAN 36 #include <winsock2.h> 37 #include <windows.h> 38 #undef WIN32_LEAN_AND_MEAN 39 #endif 40 41 #include <sys/types.h> 42 #ifndef _WIN32 43 #include <sys/socket.h> 44 #endif 45 #ifdef EVENT__HAVE_SYS_TIME_H 46 #include <sys/time.h> 47 #endif 48 #include <sys/queue.h> 49 #include <stdio.h> 50 #include <stdlib.h> 51 #ifndef _WIN32 52 #include <unistd.h> 53 #endif 54 #include <errno.h> 55 #include <signal.h> 56 #include <string.h> 57 58 #include <sys/queue.h> 59 60 #include "event2/event.h" 61 #include "event2/event_struct.h" 62 #include "event2/rpc.h" 63 #include "event2/rpc_struct.h" 64 #include "evrpc-internal.h" 65 #include "event2/http.h" 66 #include "event2/buffer.h" 67 #include "event2/tag.h" 68 #include "event2/http_struct.h" 69 #include "event2/http_compat.h" 70 #include "event2/util.h" 71 #include "util-internal.h" 72 #include "log-internal.h" 73 #include "mm-internal.h" 74 75 struct evrpc_base * 76 evrpc_init(struct evhttp *http_server) 77 { 78 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 79 if (base == NULL) 80 return (NULL); 81 82 /* we rely on the tagging sub system */ 83 evtag_init(); 84 85 TAILQ_INIT(&base->registered_rpcs); 86 TAILQ_INIT(&base->input_hooks); 87 TAILQ_INIT(&base->output_hooks); 88 89 TAILQ_INIT(&base->paused_requests); 90 91 base->http_server = http_server; 92 93 return (base); 94 } 95 96 void 97 evrpc_free(struct evrpc_base *base) 98 { 99 struct evrpc *rpc; 100 struct evrpc_hook *hook; 101 struct evrpc_hook_ctx *pause; 102 int r; 103 104 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 105 r = evrpc_unregister_rpc(base, rpc->uri); 106 EVUTIL_ASSERT(r == 0); 107 } 108 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 109 TAILQ_REMOVE(&base->paused_requests, pause, next); 110 mm_free(pause); 111 } 112 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 113 r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 114 EVUTIL_ASSERT(r); 115 } 116 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 117 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 118 EVUTIL_ASSERT(r); 119 } 120 mm_free(base); 121 } 122 123 void * 124 evrpc_add_hook(void *vbase, 125 enum EVRPC_HOOK_TYPE hook_type, 126 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 127 void *cb_arg) 128 { 129 struct evrpc_hooks_ *base = vbase; 130 struct evrpc_hook_list *head = NULL; 131 struct evrpc_hook *hook = NULL; 132 switch (hook_type) { 133 case EVRPC_INPUT: 134 head = &base->in_hooks; 135 break; 136 case EVRPC_OUTPUT: 137 head = &base->out_hooks; 138 break; 139 default: 140 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 141 } 142 143 hook = mm_calloc(1, sizeof(struct evrpc_hook)); 144 EVUTIL_ASSERT(hook != NULL); 145 146 hook->process = cb; 147 hook->process_arg = cb_arg; 148 TAILQ_INSERT_TAIL(head, hook, next); 149 150 return (hook); 151 } 152 153 static int 154 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 155 { 156 struct evrpc_hook *hook = NULL; 157 TAILQ_FOREACH(hook, head, next) { 158 if (hook == handle) { 159 TAILQ_REMOVE(head, hook, next); 160 mm_free(hook); 161 return (1); 162 } 163 } 164 165 return (0); 166 } 167 168 /* 169 * remove the hook specified by the handle 170 */ 171 172 int 173 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 174 { 175 struct evrpc_hooks_ *base = vbase; 176 struct evrpc_hook_list *head = NULL; 177 switch (hook_type) { 178 case EVRPC_INPUT: 179 head = &base->in_hooks; 180 break; 181 case EVRPC_OUTPUT: 182 head = &base->out_hooks; 183 break; 184 default: 185 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 186 } 187 188 return (evrpc_remove_hook_internal(head, handle)); 189 } 190 191 static int 192 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 193 struct evhttp_request *req, struct evbuffer *evbuf) 194 { 195 struct evrpc_hook *hook; 196 TAILQ_FOREACH(hook, head, next) { 197 int res = hook->process(ctx, req, evbuf, hook->process_arg); 198 if (res != EVRPC_CONTINUE) 199 return (res); 200 } 201 202 return (EVRPC_CONTINUE); 203 } 204 205 static void evrpc_pool_schedule(struct evrpc_pool *pool); 206 static void evrpc_request_cb(struct evhttp_request *, void *); 207 208 /* 209 * Registers a new RPC with the HTTP server. The evrpc object is expected 210 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 211 * calls this function. 212 */ 213 214 static char * 215 evrpc_construct_uri(const char *uri) 216 { 217 char *constructed_uri; 218 size_t constructed_uri_len; 219 220 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 221 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 222 event_err(1, "%s: failed to register rpc at %s", 223 __func__, uri); 224 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 225 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 226 constructed_uri[constructed_uri_len - 1] = '\0'; 227 228 return (constructed_uri); 229 } 230 231 int 232 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 233 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 234 { 235 char *constructed_uri = evrpc_construct_uri(rpc->uri); 236 237 rpc->base = base; 238 rpc->cb = cb; 239 rpc->cb_arg = cb_arg; 240 241 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 242 243 evhttp_set_cb(base->http_server, 244 constructed_uri, 245 evrpc_request_cb, 246 rpc); 247 248 mm_free(constructed_uri); 249 250 return (0); 251 } 252 253 int 254 evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 255 { 256 char *registered_uri = NULL; 257 struct evrpc *rpc; 258 int r; 259 260 /* find the right rpc; linear search might be slow */ 261 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 262 if (strcmp(rpc->uri, name) == 0) 263 break; 264 } 265 if (rpc == NULL) { 266 /* We did not find an RPC with this name */ 267 return (-1); 268 } 269 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 270 271 registered_uri = evrpc_construct_uri(name); 272 273 /* remove the http server callback */ 274 r = evhttp_del_cb(base->http_server, registered_uri); 275 EVUTIL_ASSERT(r == 0); 276 277 mm_free(registered_uri); 278 279 mm_free(__UNCONST(rpc->uri)); 280 mm_free(rpc); 281 return (0); 282 } 283 284 static int evrpc_pause_request(void *vbase, void *ctx, 285 void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 286 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 287 288 static void 289 evrpc_request_cb(struct evhttp_request *req, void *arg) 290 { 291 struct evrpc *rpc = arg; 292 struct evrpc_req_generic *rpc_state = NULL; 293 294 /* let's verify the outside parameters */ 295 if (req->type != EVHTTP_REQ_POST || 296 evbuffer_get_length(req->input_buffer) <= 0) 297 goto error; 298 299 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 300 if (rpc_state == NULL) 301 goto error; 302 rpc_state->rpc = rpc; 303 rpc_state->http_req = req; 304 rpc_state->rpc_data = NULL; 305 306 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 307 int hook_res; 308 309 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 310 311 /* 312 * allow hooks to modify the outgoing request 313 */ 314 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 315 rpc_state, req, req->input_buffer); 316 switch (hook_res) { 317 case EVRPC_TERMINATE: 318 goto error; 319 case EVRPC_PAUSE: 320 evrpc_pause_request(rpc->base, rpc_state, 321 evrpc_request_cb_closure); 322 return; 323 case EVRPC_CONTINUE: 324 break; 325 default: 326 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 327 hook_res == EVRPC_CONTINUE || 328 hook_res == EVRPC_PAUSE); 329 } 330 } 331 332 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 333 return; 334 335 error: 336 if (rpc_state) 337 evrpc_reqstate_free_(rpc_state); 338 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 339 return; 340 } 341 342 static void 343 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 344 { 345 struct evrpc_req_generic *rpc_state = arg; 346 struct evrpc *rpc; 347 struct evhttp_request *req; 348 349 EVUTIL_ASSERT(rpc_state); 350 rpc = rpc_state->rpc; 351 req = rpc_state->http_req; 352 353 if (hook_res == EVRPC_TERMINATE) 354 goto error; 355 356 /* let's check that we can parse the request */ 357 rpc_state->request = rpc->request_new(rpc->request_new_arg); 358 if (rpc_state->request == NULL) 359 goto error; 360 361 if (rpc->request_unmarshal( 362 rpc_state->request, req->input_buffer) == -1) { 363 /* we failed to parse the request; that's a bummer */ 364 goto error; 365 } 366 367 /* at this point, we have a well formed request, prepare the reply */ 368 369 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 370 if (rpc_state->reply == NULL) 371 goto error; 372 373 /* give the rpc to the user; they can deal with it */ 374 rpc->cb(rpc_state, rpc->cb_arg); 375 376 return; 377 378 error: 379 evrpc_reqstate_free_(rpc_state); 380 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 381 return; 382 } 383 384 385 void 386 evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 387 { 388 struct evrpc *rpc; 389 EVUTIL_ASSERT(rpc_state != NULL); 390 rpc = rpc_state->rpc; 391 392 /* clean up all memory */ 393 if (rpc_state->hook_meta != NULL) 394 evrpc_hook_context_free_(rpc_state->hook_meta); 395 if (rpc_state->request != NULL) 396 rpc->request_free(rpc_state->request); 397 if (rpc_state->reply != NULL) 398 rpc->reply_free(rpc_state->reply); 399 if (rpc_state->rpc_data != NULL) 400 evbuffer_free(rpc_state->rpc_data); 401 mm_free(rpc_state); 402 } 403 404 static void 405 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 406 407 void 408 evrpc_request_done(struct evrpc_req_generic *rpc_state) 409 { 410 struct evhttp_request *req; 411 struct evrpc *rpc; 412 413 EVUTIL_ASSERT(rpc_state); 414 415 req = rpc_state->http_req; 416 rpc = rpc_state->rpc; 417 418 if (rpc->reply_complete(rpc_state->reply) == -1) { 419 /* the reply was not completely filled in. error out */ 420 goto error; 421 } 422 423 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 424 /* out of memory */ 425 goto error; 426 } 427 428 /* serialize the reply */ 429 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 430 431 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 432 int hook_res; 433 434 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 435 436 /* do hook based tweaks to the request */ 437 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 438 rpc_state, req, rpc_state->rpc_data); 439 switch (hook_res) { 440 case EVRPC_TERMINATE: 441 goto error; 442 case EVRPC_PAUSE: 443 if (evrpc_pause_request(rpc->base, rpc_state, 444 evrpc_request_done_closure) == -1) 445 goto error; 446 return; 447 case EVRPC_CONTINUE: 448 break; 449 default: 450 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 451 hook_res == EVRPC_CONTINUE || 452 hook_res == EVRPC_PAUSE); 453 } 454 } 455 456 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 457 return; 458 459 error: 460 evrpc_reqstate_free_(rpc_state); 461 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 462 return; 463 } 464 465 void * 466 evrpc_get_request(struct evrpc_req_generic *req) 467 { 468 return req->request; 469 } 470 471 void * 472 evrpc_get_reply(struct evrpc_req_generic *req) 473 { 474 return req->reply; 475 } 476 477 static void 478 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 479 { 480 struct evrpc_req_generic *rpc_state = arg; 481 struct evhttp_request *req; 482 EVUTIL_ASSERT(rpc_state); 483 req = rpc_state->http_req; 484 485 if (hook_res == EVRPC_TERMINATE) 486 goto error; 487 488 /* on success, we are going to transmit marshaled binary data */ 489 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 490 evhttp_add_header(req->output_headers, 491 "Content-Type", "application/octet-stream"); 492 } 493 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 494 495 evrpc_reqstate_free_(rpc_state); 496 497 return; 498 499 error: 500 evrpc_reqstate_free_(rpc_state); 501 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 502 return; 503 } 504 505 506 /* Client implementation of RPC site */ 507 508 static int evrpc_schedule_request(struct evhttp_connection *connection, 509 struct evrpc_request_wrapper *ctx); 510 511 struct evrpc_pool * 512 evrpc_pool_new(struct event_base *base) 513 { 514 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 515 if (pool == NULL) 516 return (NULL); 517 518 TAILQ_INIT(&pool->connections); 519 TAILQ_INIT(&pool->requests); 520 521 TAILQ_INIT(&pool->paused_requests); 522 523 TAILQ_INIT(&pool->input_hooks); 524 TAILQ_INIT(&pool->output_hooks); 525 526 pool->base = base; 527 pool->timeout = -1; 528 529 return (pool); 530 } 531 532 static void 533 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 534 { 535 if (request->hook_meta != NULL) 536 evrpc_hook_context_free_(request->hook_meta); 537 mm_free(request->name); 538 mm_free(request); 539 } 540 541 void 542 evrpc_pool_free(struct evrpc_pool *pool) 543 { 544 struct evhttp_connection *connection; 545 struct evrpc_request_wrapper *request; 546 struct evrpc_hook_ctx *pause; 547 struct evrpc_hook *hook; 548 int r; 549 550 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 551 TAILQ_REMOVE(&pool->requests, request, next); 552 evrpc_request_wrapper_free(request); 553 } 554 555 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 556 TAILQ_REMOVE(&pool->paused_requests, pause, next); 557 mm_free(pause); 558 } 559 560 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 561 TAILQ_REMOVE(&pool->connections, connection, next); 562 evhttp_connection_free(connection); 563 } 564 565 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 566 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 567 EVUTIL_ASSERT(r); 568 } 569 570 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 571 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 572 EVUTIL_ASSERT(r); 573 } 574 575 mm_free(pool); 576 } 577 578 /* 579 * Add a connection to the RPC pool. A request scheduled on the pool 580 * may use any available connection. 581 */ 582 583 void 584 evrpc_pool_add_connection(struct evrpc_pool *pool, 585 struct evhttp_connection *connection) 586 { 587 EVUTIL_ASSERT(connection->http_server == NULL); 588 TAILQ_INSERT_TAIL(&pool->connections, connection, next); 589 590 /* 591 * associate an event base with this connection 592 */ 593 if (pool->base != NULL) 594 evhttp_connection_set_base(connection, pool->base); 595 596 /* 597 * unless a timeout was specifically set for a connection, 598 * the connection inherits the timeout from the pool. 599 */ 600 if (!evutil_timerisset(&connection->timeout)) 601 evhttp_connection_set_timeout(connection, pool->timeout); 602 603 /* 604 * if we have any requests pending, schedule them with the new 605 * connections. 606 */ 607 608 if (TAILQ_FIRST(&pool->requests) != NULL) { 609 struct evrpc_request_wrapper *request = 610 TAILQ_FIRST(&pool->requests); 611 TAILQ_REMOVE(&pool->requests, request, next); 612 evrpc_schedule_request(connection, request); 613 } 614 } 615 616 void 617 evrpc_pool_remove_connection(struct evrpc_pool *pool, 618 struct evhttp_connection *connection) 619 { 620 TAILQ_REMOVE(&pool->connections, connection, next); 621 } 622 623 void 624 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 625 { 626 struct evhttp_connection *evcon; 627 TAILQ_FOREACH(evcon, &pool->connections, next) { 628 evhttp_connection_set_timeout(evcon, timeout_in_secs); 629 } 630 pool->timeout = timeout_in_secs; 631 } 632 633 634 static void evrpc_reply_done(struct evhttp_request *, void *); 635 static void evrpc_request_timeout(evutil_socket_t, short, void *); 636 637 /* 638 * Finds a connection object associated with the pool that is currently 639 * idle and can be used to make a request. 640 */ 641 static struct evhttp_connection * 642 evrpc_pool_find_connection(struct evrpc_pool *pool) 643 { 644 struct evhttp_connection *connection; 645 TAILQ_FOREACH(connection, &pool->connections, next) { 646 if (TAILQ_FIRST(&connection->requests) == NULL) 647 return (connection); 648 } 649 650 return (NULL); 651 } 652 653 /* 654 * Prototypes responsible for evrpc scheduling and hooking 655 */ 656 657 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 658 659 /* 660 * We assume that the ctx is no longer queued on the pool. 661 */ 662 static int 663 evrpc_schedule_request(struct evhttp_connection *connection, 664 struct evrpc_request_wrapper *ctx) 665 { 666 struct evhttp_request *req = NULL; 667 struct evrpc_pool *pool = ctx->pool; 668 struct evrpc_status status; 669 670 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 671 goto error; 672 673 /* serialize the request data into the output buffer */ 674 ctx->request_marshal(req->output_buffer, ctx->request); 675 676 /* we need to know the connection that we might have to abort */ 677 ctx->evcon = connection; 678 679 /* if we get paused we also need to know the request */ 680 ctx->req = req; 681 682 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 683 int hook_res; 684 685 evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 686 687 /* apply hooks to the outgoing request */ 688 hook_res = evrpc_process_hooks(&pool->output_hooks, 689 ctx, req, req->output_buffer); 690 691 switch (hook_res) { 692 case EVRPC_TERMINATE: 693 goto error; 694 case EVRPC_PAUSE: 695 /* we need to be explicitly resumed */ 696 if (evrpc_pause_request(pool, ctx, 697 evrpc_schedule_request_closure) == -1) 698 goto error; 699 return (0); 700 case EVRPC_CONTINUE: 701 /* we can just continue */ 702 break; 703 default: 704 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 705 hook_res == EVRPC_CONTINUE || 706 hook_res == EVRPC_PAUSE); 707 } 708 } 709 710 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 711 return (0); 712 713 error: 714 memset(&status, 0, sizeof(status)); 715 status.error = EVRPC_STATUS_ERR_UNSTARTED; 716 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 717 evrpc_request_wrapper_free(ctx); 718 return (-1); 719 } 720 721 static void 722 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 723 { 724 struct evrpc_request_wrapper *ctx = arg; 725 struct evhttp_connection *connection = ctx->evcon; 726 struct evhttp_request *req = ctx->req; 727 struct evrpc_pool *pool = ctx->pool; 728 struct evrpc_status status; 729 char *uri = NULL; 730 int res = 0; 731 732 if (hook_res == EVRPC_TERMINATE) 733 goto error; 734 735 uri = evrpc_construct_uri(ctx->name); 736 if (uri == NULL) 737 goto error; 738 739 if (pool->timeout > 0) { 740 /* 741 * a timeout after which the whole rpc is going to be aborted. 742 */ 743 struct timeval tv; 744 evutil_timerclear(&tv); 745 tv.tv_sec = pool->timeout; 746 evtimer_add(&ctx->ev_timeout, &tv); 747 } 748 749 /* start the request over the connection */ 750 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 751 mm_free(uri); 752 753 if (res == -1) 754 goto error; 755 756 return; 757 758 error: 759 memset(&status, 0, sizeof(status)); 760 status.error = EVRPC_STATUS_ERR_UNSTARTED; 761 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 762 evrpc_request_wrapper_free(ctx); 763 } 764 765 /* we just queue the paused request on the pool under the req object */ 766 static int 767 evrpc_pause_request(void *vbase, void *ctx, 768 void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 769 { 770 struct evrpc_hooks_ *base = vbase; 771 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 772 if (pause == NULL) 773 return (-1); 774 775 pause->ctx = ctx; 776 pause->cb = cb; 777 778 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 779 return (0); 780 } 781 782 int 783 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 784 { 785 struct evrpc_hooks_ *base = vbase; 786 struct evrpc_pause_list *head = &base->pause_requests; 787 struct evrpc_hook_ctx *pause; 788 789 TAILQ_FOREACH(pause, head, next) { 790 if (pause->ctx == ctx) 791 break; 792 } 793 794 if (pause == NULL) 795 return (-1); 796 797 (*pause->cb)(pause->ctx, res); 798 TAILQ_REMOVE(head, pause, next); 799 mm_free(pause); 800 return (0); 801 } 802 803 int 804 evrpc_make_request(struct evrpc_request_wrapper *ctx) 805 { 806 struct evrpc_pool *pool = ctx->pool; 807 808 /* initialize the event structure for this rpc */ 809 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 810 811 /* we better have some available connections on the pool */ 812 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 813 814 /* 815 * if no connection is available, we queue the request on the pool, 816 * the next time a connection is empty, the rpc will be send on that. 817 */ 818 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 819 820 evrpc_pool_schedule(pool); 821 822 return (0); 823 } 824 825 826 struct evrpc_request_wrapper * 827 evrpc_make_request_ctx( 828 struct evrpc_pool *pool, void *request, void *reply, 829 const char *rpcname, 830 void (*req_marshal)(struct evbuffer*, void *), 831 void (*rpl_clear)(void *), 832 int (*rpl_unmarshal)(void *, struct evbuffer *), 833 void (*cb)(struct evrpc_status *, void *, void *, void *), 834 void *cbarg) 835 { 836 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 837 mm_malloc(sizeof(struct evrpc_request_wrapper)); 838 if (ctx == NULL) 839 return (NULL); 840 841 ctx->pool = pool; 842 ctx->hook_meta = NULL; 843 ctx->evcon = NULL; 844 ctx->name = mm_strdup(rpcname); 845 if (ctx->name == NULL) { 846 mm_free(ctx); 847 return (NULL); 848 } 849 ctx->cb = cb; 850 ctx->cb_arg = cbarg; 851 ctx->request = request; 852 ctx->reply = reply; 853 ctx->request_marshal = req_marshal; 854 ctx->reply_clear = rpl_clear; 855 ctx->reply_unmarshal = rpl_unmarshal; 856 857 return (ctx); 858 } 859 860 static void 861 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 862 863 static void 864 evrpc_reply_done(struct evhttp_request *req, void *arg) 865 { 866 struct evrpc_request_wrapper *ctx = arg; 867 struct evrpc_pool *pool = ctx->pool; 868 int hook_res = EVRPC_CONTINUE; 869 870 /* cancel any timeout we might have scheduled */ 871 event_del(&ctx->ev_timeout); 872 873 ctx->req = req; 874 875 /* we need to get the reply now */ 876 if (req == NULL) { 877 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 878 return; 879 } 880 881 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 882 evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 883 884 /* apply hooks to the incoming request */ 885 hook_res = evrpc_process_hooks(&pool->input_hooks, 886 ctx, req, req->input_buffer); 887 888 switch (hook_res) { 889 case EVRPC_TERMINATE: 890 case EVRPC_CONTINUE: 891 break; 892 case EVRPC_PAUSE: 893 /* 894 * if we get paused we also need to know the 895 * request. unfortunately, the underlying 896 * layer is going to free it. we need to 897 * request ownership explicitly 898 */ 899 evhttp_request_own(req); 900 901 evrpc_pause_request(pool, ctx, 902 evrpc_reply_done_closure); 903 return; 904 default: 905 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 906 hook_res == EVRPC_CONTINUE || 907 hook_res == EVRPC_PAUSE); 908 } 909 } 910 911 evrpc_reply_done_closure(ctx, hook_res); 912 913 /* http request is being freed by underlying layer */ 914 } 915 916 static void 917 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 918 { 919 struct evrpc_request_wrapper *ctx = arg; 920 struct evhttp_request *req = ctx->req; 921 struct evrpc_pool *pool = ctx->pool; 922 struct evrpc_status status; 923 int res = -1; 924 925 memset(&status, 0, sizeof(status)); 926 status.http_req = req; 927 928 /* we need to get the reply now */ 929 if (req == NULL) { 930 status.error = EVRPC_STATUS_ERR_TIMEOUT; 931 } else if (hook_res == EVRPC_TERMINATE) { 932 status.error = EVRPC_STATUS_ERR_HOOKABORTED; 933 } else { 934 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 935 if (res == -1) 936 status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 937 } 938 939 if (res == -1) { 940 /* clear everything that we might have written previously */ 941 ctx->reply_clear(ctx->reply); 942 } 943 944 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 945 946 evrpc_request_wrapper_free(ctx); 947 948 /* the http layer owned the original request structure, but if we 949 * got paused, we asked for ownership and need to free it here. */ 950 if (req != NULL && evhttp_request_is_owned(req)) 951 evhttp_request_free(req); 952 953 /* see if we can schedule another request */ 954 evrpc_pool_schedule(pool); 955 } 956 957 static void 958 evrpc_pool_schedule(struct evrpc_pool *pool) 959 { 960 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 961 struct evhttp_connection *evcon; 962 963 /* if no requests are pending, we have no work */ 964 if (ctx == NULL) 965 return; 966 967 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 968 TAILQ_REMOVE(&pool->requests, ctx, next); 969 evrpc_schedule_request(evcon, ctx); 970 } 971 } 972 973 static void 974 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 975 { 976 struct evrpc_request_wrapper *ctx = arg; 977 struct evhttp_connection *evcon = ctx->evcon; 978 EVUTIL_ASSERT(evcon != NULL); 979 980 evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 981 } 982 983 /* 984 * frees potential meta data associated with a request. 985 */ 986 987 static void 988 evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 989 { 990 struct evrpc_meta *entry; 991 EVUTIL_ASSERT(meta_data != NULL); 992 993 while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 994 TAILQ_REMOVE(meta_data, entry, next); 995 mm_free(entry->key); 996 mm_free(entry->data); 997 mm_free(entry); 998 } 999 } 1000 1001 static struct evrpc_hook_meta * 1002 evrpc_hook_meta_new_(void) 1003 { 1004 struct evrpc_hook_meta *ctx; 1005 ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1006 EVUTIL_ASSERT(ctx != NULL); 1007 1008 TAILQ_INIT(&ctx->meta_data); 1009 ctx->evcon = NULL; 1010 1011 return (ctx); 1012 } 1013 1014 static void 1015 evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1016 struct evhttp_connection *evcon) 1017 { 1018 struct evrpc_hook_meta *ctx = *pctx; 1019 if (ctx == NULL) 1020 *pctx = ctx = evrpc_hook_meta_new_(); 1021 ctx->evcon = evcon; 1022 } 1023 1024 static void 1025 evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1026 { 1027 evrpc_meta_data_free(&ctx->meta_data); 1028 mm_free(ctx); 1029 } 1030 1031 /* Adds meta data */ 1032 void 1033 evrpc_hook_add_meta(void *ctx, const char *key, 1034 const void *data, size_t data_size) 1035 { 1036 struct evrpc_request_wrapper *req = ctx; 1037 struct evrpc_hook_meta *store = NULL; 1038 struct evrpc_meta *meta = NULL; 1039 1040 if ((store = req->hook_meta) == NULL) 1041 store = req->hook_meta = evrpc_hook_meta_new_(); 1042 1043 meta = mm_malloc(sizeof(struct evrpc_meta)); 1044 EVUTIL_ASSERT(meta != NULL); 1045 meta->key = mm_strdup(key); 1046 EVUTIL_ASSERT(meta->key != NULL); 1047 meta->data_size = data_size; 1048 meta->data = mm_malloc(data_size); 1049 EVUTIL_ASSERT(meta->data != NULL); 1050 memcpy(meta->data, data, data_size); 1051 1052 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1053 } 1054 1055 int 1056 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1057 { 1058 struct evrpc_request_wrapper *req = ctx; 1059 struct evrpc_meta *meta = NULL; 1060 1061 if (req->hook_meta == NULL) 1062 return (-1); 1063 1064 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1065 if (strcmp(meta->key, key) == 0) { 1066 *data = meta->data; 1067 *data_size = meta->data_size; 1068 return (0); 1069 } 1070 } 1071 1072 return (-1); 1073 } 1074 1075 struct evhttp_connection * 1076 evrpc_hook_get_connection(void *ctx) 1077 { 1078 struct evrpc_request_wrapper *req = ctx; 1079 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1080 } 1081 1082 int 1083 evrpc_send_request_generic(struct evrpc_pool *pool, 1084 void *request, void *reply, 1085 void (*cb)(struct evrpc_status *, void *, void *, void *), 1086 void *cb_arg, 1087 const char *rpcname, 1088 void (*req_marshal)(struct evbuffer *, void *), 1089 void (*rpl_clear)(void *), 1090 int (*rpl_unmarshal)(void *, struct evbuffer *)) 1091 { 1092 struct evrpc_status status; 1093 struct evrpc_request_wrapper *ctx; 1094 ctx = evrpc_make_request_ctx(pool, request, reply, 1095 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1096 if (ctx == NULL) 1097 goto error; 1098 return (evrpc_make_request(ctx)); 1099 error: 1100 memset(&status, 0, sizeof(status)); 1101 status.error = EVRPC_STATUS_ERR_UNSTARTED; 1102 (*(cb))(&status, request, reply, cb_arg); 1103 return (-1); 1104 } 1105 1106 /** Takes a request object and fills it in with the right magic */ 1107 static struct evrpc * 1108 evrpc_register_object(const char *name, 1109 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1110 int (*req_unmarshal)(void *, struct evbuffer *), 1111 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1112 int (*rpl_complete)(void *), 1113 void (*rpl_marshal)(struct evbuffer *, void *)) 1114 { 1115 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1116 if (rpc == NULL) 1117 return (NULL); 1118 rpc->uri = mm_strdup(name); 1119 if (rpc->uri == NULL) { 1120 mm_free(rpc); 1121 return (NULL); 1122 } 1123 rpc->request_new = req_new; 1124 rpc->request_new_arg = req_new_arg; 1125 rpc->request_free = req_free; 1126 rpc->request_unmarshal = req_unmarshal; 1127 rpc->reply_new = rpl_new; 1128 rpc->reply_new_arg = rpl_new_arg; 1129 rpc->reply_free = rpl_free; 1130 rpc->reply_complete = rpl_complete; 1131 rpc->reply_marshal = rpl_marshal; 1132 return (rpc); 1133 } 1134 1135 int 1136 evrpc_register_generic(struct evrpc_base *base, const char *name, 1137 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1138 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1139 int (*req_unmarshal)(void *, struct evbuffer *), 1140 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1141 int (*rpl_complete)(void *), 1142 void (*rpl_marshal)(struct evbuffer *, void *)) 1143 { 1144 struct evrpc* rpc = 1145 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1146 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1147 if (rpc == NULL) 1148 return (-1); 1149 evrpc_register_rpc(base, rpc, 1150 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1151 return (0); 1152 } 1153 1154 /** accessors for obscure and undocumented functionality */ 1155 struct evrpc_pool * 1156 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1157 { 1158 return (ctx->pool); 1159 } 1160 1161 void 1162 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1163 struct evrpc_pool *pool) 1164 { 1165 ctx->pool = pool; 1166 } 1167 1168 void 1169 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1170 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1171 void *cb_arg) 1172 { 1173 ctx->cb = cb; 1174 ctx->cb_arg = cb_arg; 1175 } 1176