1 /* $NetBSD: evrpc.c,v 1.5 2020/05/25 20:47:33 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 5 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 #include "event2/event-config.h" 30 #include "evconfig-private.h" 31 32 #ifdef _WIN32 33 #define WIN32_LEAN_AND_MEAN 34 #include <winsock2.h> 35 #include <windows.h> 36 #undef WIN32_LEAN_AND_MEAN 37 #endif 38 39 #include <sys/types.h> 40 #ifndef _WIN32 41 #include <sys/socket.h> 42 #endif 43 #ifdef EVENT__HAVE_SYS_TIME_H 44 #include <sys/time.h> 45 #endif 46 #include <sys/queue.h> 47 #include <stdio.h> 48 #include <stdlib.h> 49 #ifndef _WIN32 50 #include <unistd.h> 51 #endif 52 #include <errno.h> 53 #include <signal.h> 54 #include <string.h> 55 56 #include <sys/queue.h> 57 58 #include "event2/event.h" 59 #include "event2/event_struct.h" 60 #include "event2/rpc.h" 61 #include "event2/rpc_struct.h" 62 #include "evrpc-internal.h" 63 #include "event2/http.h" 64 #include "event2/buffer.h" 65 #include "event2/tag.h" 66 #include "event2/http_struct.h" 67 #include "event2/http_compat.h" 68 #include "event2/util.h" 69 #include "util-internal.h" 70 #include "log-internal.h" 71 #include "mm-internal.h" 72 73 struct evrpc_base * 74 evrpc_init(struct evhttp *http_server) 75 { 76 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 77 if (base == NULL) 78 return (NULL); 79 80 /* we rely on the tagging sub system */ 81 evtag_init(); 82 83 TAILQ_INIT(&base->registered_rpcs); 84 TAILQ_INIT(&base->input_hooks); 85 TAILQ_INIT(&base->output_hooks); 86 87 TAILQ_INIT(&base->paused_requests); 88 89 base->http_server = http_server; 90 91 return (base); 92 } 93 94 void 95 evrpc_free(struct evrpc_base *base) 96 { 97 struct evrpc *rpc; 98 struct evrpc_hook *hook; 99 struct evrpc_hook_ctx *pause; 100 int r; 101 102 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 103 r = evrpc_unregister_rpc(base, rpc->uri); 104 EVUTIL_ASSERT(r == 0); 105 } 106 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 107 TAILQ_REMOVE(&base->paused_requests, pause, next); 108 mm_free(pause); 109 } 110 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 111 r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 112 EVUTIL_ASSERT(r); 113 } 114 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 115 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 116 EVUTIL_ASSERT(r); 117 } 118 mm_free(base); 119 } 120 121 void * 122 evrpc_add_hook(void *vbase, 123 enum EVRPC_HOOK_TYPE hook_type, 124 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 125 void *cb_arg) 126 { 127 struct evrpc_hooks_ *base = vbase; 128 struct evrpc_hook_list *head = NULL; 129 struct evrpc_hook *hook = NULL; 130 switch (hook_type) { 131 case EVRPC_INPUT: 132 head = &base->in_hooks; 133 break; 134 case EVRPC_OUTPUT: 135 head = &base->out_hooks; 136 break; 137 default: 138 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 139 } 140 141 hook = mm_calloc(1, sizeof(struct evrpc_hook)); 142 EVUTIL_ASSERT(hook != NULL); 143 144 hook->process = cb; 145 hook->process_arg = cb_arg; 146 TAILQ_INSERT_TAIL(head, hook, next); 147 148 return (hook); 149 } 150 151 static int 152 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 153 { 154 struct evrpc_hook *hook = NULL; 155 TAILQ_FOREACH(hook, head, next) { 156 if (hook == handle) { 157 TAILQ_REMOVE(head, hook, next); 158 mm_free(hook); 159 return (1); 160 } 161 } 162 163 return (0); 164 } 165 166 /* 167 * remove the hook specified by the handle 168 */ 169 170 int 171 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 172 { 173 struct evrpc_hooks_ *base = vbase; 174 struct evrpc_hook_list *head = NULL; 175 switch (hook_type) { 176 case EVRPC_INPUT: 177 head = &base->in_hooks; 178 break; 179 case EVRPC_OUTPUT: 180 head = &base->out_hooks; 181 break; 182 default: 183 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 184 } 185 186 return (evrpc_remove_hook_internal(head, handle)); 187 } 188 189 static int 190 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 191 struct evhttp_request *req, struct evbuffer *evbuf) 192 { 193 struct evrpc_hook *hook; 194 TAILQ_FOREACH(hook, head, next) { 195 int res = hook->process(ctx, req, evbuf, hook->process_arg); 196 if (res != EVRPC_CONTINUE) 197 return (res); 198 } 199 200 return (EVRPC_CONTINUE); 201 } 202 203 static void evrpc_pool_schedule(struct evrpc_pool *pool); 204 static void evrpc_request_cb(struct evhttp_request *, void *); 205 206 /* 207 * Registers a new RPC with the HTTP server. The evrpc object is expected 208 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 209 * calls this function. 210 */ 211 212 static char * 213 evrpc_construct_uri(const char *uri) 214 { 215 char *constructed_uri; 216 size_t constructed_uri_len; 217 218 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 219 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 220 event_err(1, "%s: failed to register rpc at %s", 221 __func__, uri); 222 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 223 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 224 constructed_uri[constructed_uri_len - 1] = '\0'; 225 226 return (constructed_uri); 227 } 228 229 int 230 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 231 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 232 { 233 char *constructed_uri = evrpc_construct_uri(rpc->uri); 234 235 rpc->base = base; 236 rpc->cb = cb; 237 rpc->cb_arg = cb_arg; 238 239 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 240 241 evhttp_set_cb(base->http_server, 242 constructed_uri, 243 evrpc_request_cb, 244 rpc); 245 246 mm_free(constructed_uri); 247 248 return (0); 249 } 250 251 int 252 evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 253 { 254 char *registered_uri = NULL; 255 struct evrpc *rpc; 256 int r; 257 258 /* find the right rpc; linear search might be slow */ 259 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 260 if (strcmp(rpc->uri, name) == 0) 261 break; 262 } 263 if (rpc == NULL) { 264 /* We did not find an RPC with this name */ 265 return (-1); 266 } 267 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 268 269 registered_uri = evrpc_construct_uri(name); 270 271 /* remove the http server callback */ 272 r = evhttp_del_cb(base->http_server, registered_uri); 273 EVUTIL_ASSERT(r == 0); 274 275 mm_free(registered_uri); 276 277 mm_free((char *)rpc->uri); 278 mm_free(rpc); 279 return (0); 280 } 281 282 static int evrpc_pause_request(void *vbase, void *ctx, 283 void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 284 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 285 286 static void 287 evrpc_request_cb(struct evhttp_request *req, void *arg) 288 { 289 struct evrpc *rpc = arg; 290 struct evrpc_req_generic *rpc_state = NULL; 291 292 /* let's verify the outside parameters */ 293 if (req->type != EVHTTP_REQ_POST || 294 evbuffer_get_length(req->input_buffer) <= 0) 295 goto error; 296 297 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 298 if (rpc_state == NULL) 299 goto error; 300 rpc_state->rpc = rpc; 301 rpc_state->http_req = req; 302 rpc_state->rpc_data = NULL; 303 304 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 305 int hook_res; 306 307 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 308 309 /* 310 * allow hooks to modify the outgoing request 311 */ 312 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 313 rpc_state, req, req->input_buffer); 314 switch (hook_res) { 315 case EVRPC_TERMINATE: 316 goto error; 317 case EVRPC_PAUSE: 318 evrpc_pause_request(rpc->base, rpc_state, 319 evrpc_request_cb_closure); 320 return; 321 case EVRPC_CONTINUE: 322 break; 323 default: 324 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 325 hook_res == EVRPC_CONTINUE || 326 hook_res == EVRPC_PAUSE); 327 } 328 } 329 330 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 331 return; 332 333 error: 334 evrpc_reqstate_free_(rpc_state); 335 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 336 return; 337 } 338 339 static void 340 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 341 { 342 struct evrpc_req_generic *rpc_state = arg; 343 struct evrpc *rpc; 344 struct evhttp_request *req; 345 346 EVUTIL_ASSERT(rpc_state); 347 rpc = rpc_state->rpc; 348 req = rpc_state->http_req; 349 350 if (hook_res == EVRPC_TERMINATE) 351 goto error; 352 353 /* let's check that we can parse the request */ 354 rpc_state->request = rpc->request_new(rpc->request_new_arg); 355 if (rpc_state->request == NULL) 356 goto error; 357 358 if (rpc->request_unmarshal( 359 rpc_state->request, req->input_buffer) == -1) { 360 /* we failed to parse the request; that's a bummer */ 361 goto error; 362 } 363 364 /* at this point, we have a well formed request, prepare the reply */ 365 366 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 367 if (rpc_state->reply == NULL) 368 goto error; 369 370 /* give the rpc to the user; they can deal with it */ 371 rpc->cb(rpc_state, rpc->cb_arg); 372 373 return; 374 375 error: 376 evrpc_reqstate_free_(rpc_state); 377 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 378 return; 379 } 380 381 382 void 383 evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 384 { 385 struct evrpc *rpc; 386 EVUTIL_ASSERT(rpc_state != NULL); 387 rpc = rpc_state->rpc; 388 389 /* clean up all memory */ 390 if (rpc_state->hook_meta != NULL) 391 evrpc_hook_context_free_(rpc_state->hook_meta); 392 if (rpc_state->request != NULL) 393 rpc->request_free(rpc_state->request); 394 if (rpc_state->reply != NULL) 395 rpc->reply_free(rpc_state->reply); 396 if (rpc_state->rpc_data != NULL) 397 evbuffer_free(rpc_state->rpc_data); 398 mm_free(rpc_state); 399 } 400 401 static void 402 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 403 404 void 405 evrpc_request_done(struct evrpc_req_generic *rpc_state) 406 { 407 struct evhttp_request *req; 408 struct evrpc *rpc; 409 410 EVUTIL_ASSERT(rpc_state); 411 412 req = rpc_state->http_req; 413 rpc = rpc_state->rpc; 414 415 if (rpc->reply_complete(rpc_state->reply) == -1) { 416 /* the reply was not completely filled in. error out */ 417 goto error; 418 } 419 420 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 421 /* out of memory */ 422 goto error; 423 } 424 425 /* serialize the reply */ 426 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 427 428 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 429 int hook_res; 430 431 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 432 433 /* do hook based tweaks to the request */ 434 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 435 rpc_state, req, rpc_state->rpc_data); 436 switch (hook_res) { 437 case EVRPC_TERMINATE: 438 goto error; 439 case EVRPC_PAUSE: 440 if (evrpc_pause_request(rpc->base, rpc_state, 441 evrpc_request_done_closure) == -1) 442 goto error; 443 return; 444 case EVRPC_CONTINUE: 445 break; 446 default: 447 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 448 hook_res == EVRPC_CONTINUE || 449 hook_res == EVRPC_PAUSE); 450 } 451 } 452 453 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 454 return; 455 456 error: 457 evrpc_reqstate_free_(rpc_state); 458 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 459 return; 460 } 461 462 void * 463 evrpc_get_request(struct evrpc_req_generic *req) 464 { 465 return req->request; 466 } 467 468 void * 469 evrpc_get_reply(struct evrpc_req_generic *req) 470 { 471 return req->reply; 472 } 473 474 static void 475 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 476 { 477 struct evrpc_req_generic *rpc_state = arg; 478 struct evhttp_request *req; 479 EVUTIL_ASSERT(rpc_state); 480 req = rpc_state->http_req; 481 482 if (hook_res == EVRPC_TERMINATE) 483 goto error; 484 485 /* on success, we are going to transmit marshaled binary data */ 486 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 487 evhttp_add_header(req->output_headers, 488 "Content-Type", "application/octet-stream"); 489 } 490 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 491 492 evrpc_reqstate_free_(rpc_state); 493 494 return; 495 496 error: 497 evrpc_reqstate_free_(rpc_state); 498 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 499 return; 500 } 501 502 503 /* Client implementation of RPC site */ 504 505 static int evrpc_schedule_request(struct evhttp_connection *connection, 506 struct evrpc_request_wrapper *ctx); 507 508 struct evrpc_pool * 509 evrpc_pool_new(struct event_base *base) 510 { 511 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 512 if (pool == NULL) 513 return (NULL); 514 515 TAILQ_INIT(&pool->connections); 516 TAILQ_INIT(&pool->requests); 517 518 TAILQ_INIT(&pool->paused_requests); 519 520 TAILQ_INIT(&pool->input_hooks); 521 TAILQ_INIT(&pool->output_hooks); 522 523 pool->base = base; 524 pool->timeout = -1; 525 526 return (pool); 527 } 528 529 static void 530 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 531 { 532 if (request->hook_meta != NULL) 533 evrpc_hook_context_free_(request->hook_meta); 534 mm_free(request->name); 535 mm_free(request); 536 } 537 538 void 539 evrpc_pool_free(struct evrpc_pool *pool) 540 { 541 struct evhttp_connection *connection; 542 struct evrpc_request_wrapper *request; 543 struct evrpc_hook_ctx *pause; 544 struct evrpc_hook *hook; 545 int r; 546 547 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 548 TAILQ_REMOVE(&pool->requests, request, next); 549 evrpc_request_wrapper_free(request); 550 } 551 552 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 553 TAILQ_REMOVE(&pool->paused_requests, pause, next); 554 mm_free(pause); 555 } 556 557 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 558 TAILQ_REMOVE(&pool->connections, connection, next); 559 evhttp_connection_free(connection); 560 } 561 562 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 563 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 564 EVUTIL_ASSERT(r); 565 } 566 567 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 568 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 569 EVUTIL_ASSERT(r); 570 } 571 572 mm_free(pool); 573 } 574 575 /* 576 * Add a connection to the RPC pool. A request scheduled on the pool 577 * may use any available connection. 578 */ 579 580 void 581 evrpc_pool_add_connection(struct evrpc_pool *pool, 582 struct evhttp_connection *connection) 583 { 584 EVUTIL_ASSERT(connection->http_server == NULL); 585 TAILQ_INSERT_TAIL(&pool->connections, connection, next); 586 587 /* 588 * associate an event base with this connection 589 */ 590 if (pool->base != NULL) 591 evhttp_connection_set_base(connection, pool->base); 592 593 /* 594 * unless a timeout was specifically set for a connection, 595 * the connection inherits the timeout from the pool. 596 */ 597 if (!evutil_timerisset(&connection->timeout)) 598 evhttp_connection_set_timeout(connection, pool->timeout); 599 600 /* 601 * if we have any requests pending, schedule them with the new 602 * connections. 603 */ 604 605 if (TAILQ_FIRST(&pool->requests) != NULL) { 606 struct evrpc_request_wrapper *request = 607 TAILQ_FIRST(&pool->requests); 608 TAILQ_REMOVE(&pool->requests, request, next); 609 evrpc_schedule_request(connection, request); 610 } 611 } 612 613 void 614 evrpc_pool_remove_connection(struct evrpc_pool *pool, 615 struct evhttp_connection *connection) 616 { 617 TAILQ_REMOVE(&pool->connections, connection, next); 618 } 619 620 void 621 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 622 { 623 struct evhttp_connection *evcon; 624 TAILQ_FOREACH(evcon, &pool->connections, next) { 625 evhttp_connection_set_timeout(evcon, timeout_in_secs); 626 } 627 pool->timeout = timeout_in_secs; 628 } 629 630 631 static void evrpc_reply_done(struct evhttp_request *, void *); 632 static void evrpc_request_timeout(evutil_socket_t, short, void *); 633 634 /* 635 * Finds a connection object associated with the pool that is currently 636 * idle and can be used to make a request. 637 */ 638 static struct evhttp_connection * 639 evrpc_pool_find_connection(struct evrpc_pool *pool) 640 { 641 struct evhttp_connection *connection; 642 TAILQ_FOREACH(connection, &pool->connections, next) { 643 if (TAILQ_FIRST(&connection->requests) == NULL) 644 return (connection); 645 } 646 647 return (NULL); 648 } 649 650 /* 651 * Prototypes responsible for evrpc scheduling and hooking 652 */ 653 654 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 655 656 /* 657 * We assume that the ctx is no longer queued on the pool. 658 */ 659 static int 660 evrpc_schedule_request(struct evhttp_connection *connection, 661 struct evrpc_request_wrapper *ctx) 662 { 663 struct evhttp_request *req = NULL; 664 struct evrpc_pool *pool = ctx->pool; 665 struct evrpc_status status; 666 667 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 668 goto error; 669 670 /* serialize the request data into the output buffer */ 671 ctx->request_marshal(req->output_buffer, ctx->request); 672 673 /* we need to know the connection that we might have to abort */ 674 ctx->evcon = connection; 675 676 /* if we get paused we also need to know the request */ 677 ctx->req = req; 678 679 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 680 int hook_res; 681 682 evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 683 684 /* apply hooks to the outgoing request */ 685 hook_res = evrpc_process_hooks(&pool->output_hooks, 686 ctx, req, req->output_buffer); 687 688 switch (hook_res) { 689 case EVRPC_TERMINATE: 690 goto error; 691 case EVRPC_PAUSE: 692 /* we need to be explicitly resumed */ 693 if (evrpc_pause_request(pool, ctx, 694 evrpc_schedule_request_closure) == -1) 695 goto error; 696 return (0); 697 case EVRPC_CONTINUE: 698 /* we can just continue */ 699 break; 700 default: 701 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 702 hook_res == EVRPC_CONTINUE || 703 hook_res == EVRPC_PAUSE); 704 } 705 } 706 707 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 708 return (0); 709 710 error: 711 memset(&status, 0, sizeof(status)); 712 status.error = EVRPC_STATUS_ERR_UNSTARTED; 713 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 714 evrpc_request_wrapper_free(ctx); 715 return (-1); 716 } 717 718 static void 719 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 720 { 721 struct evrpc_request_wrapper *ctx = arg; 722 struct evhttp_connection *connection = ctx->evcon; 723 struct evhttp_request *req = ctx->req; 724 struct evrpc_pool *pool = ctx->pool; 725 struct evrpc_status status; 726 char *uri = NULL; 727 int res = 0; 728 729 if (hook_res == EVRPC_TERMINATE) 730 goto error; 731 732 uri = evrpc_construct_uri(ctx->name); 733 if (uri == NULL) 734 goto error; 735 736 if (pool->timeout > 0) { 737 /* 738 * a timeout after which the whole rpc is going to be aborted. 739 */ 740 struct timeval tv; 741 evutil_timerclear(&tv); 742 tv.tv_sec = pool->timeout; 743 evtimer_add(&ctx->ev_timeout, &tv); 744 } 745 746 /* start the request over the connection */ 747 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 748 mm_free(uri); 749 750 if (res == -1) 751 goto error; 752 753 return; 754 755 error: 756 memset(&status, 0, sizeof(status)); 757 status.error = EVRPC_STATUS_ERR_UNSTARTED; 758 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 759 evrpc_request_wrapper_free(ctx); 760 } 761 762 /* we just queue the paused request on the pool under the req object */ 763 static int 764 evrpc_pause_request(void *vbase, void *ctx, 765 void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 766 { 767 struct evrpc_hooks_ *base = vbase; 768 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 769 if (pause == NULL) 770 return (-1); 771 772 pause->ctx = ctx; 773 pause->cb = cb; 774 775 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 776 return (0); 777 } 778 779 int 780 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 781 { 782 struct evrpc_hooks_ *base = vbase; 783 struct evrpc_pause_list *head = &base->pause_requests; 784 struct evrpc_hook_ctx *pause; 785 786 TAILQ_FOREACH(pause, head, next) { 787 if (pause->ctx == ctx) 788 break; 789 } 790 791 if (pause == NULL) 792 return (-1); 793 794 (*pause->cb)(pause->ctx, res); 795 TAILQ_REMOVE(head, pause, next); 796 mm_free(pause); 797 return (0); 798 } 799 800 int 801 evrpc_make_request(struct evrpc_request_wrapper *ctx) 802 { 803 struct evrpc_pool *pool = ctx->pool; 804 805 /* initialize the event structure for this rpc */ 806 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 807 808 /* we better have some available connections on the pool */ 809 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 810 811 /* 812 * if no connection is available, we queue the request on the pool, 813 * the next time a connection is empty, the rpc will be send on that. 814 */ 815 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 816 817 evrpc_pool_schedule(pool); 818 819 return (0); 820 } 821 822 823 struct evrpc_request_wrapper * 824 evrpc_make_request_ctx( 825 struct evrpc_pool *pool, void *request, void *reply, 826 const char *rpcname, 827 void (*req_marshal)(struct evbuffer*, void *), 828 void (*rpl_clear)(void *), 829 int (*rpl_unmarshal)(void *, struct evbuffer *), 830 void (*cb)(struct evrpc_status *, void *, void *, void *), 831 void *cbarg) 832 { 833 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 834 mm_malloc(sizeof(struct evrpc_request_wrapper)); 835 if (ctx == NULL) 836 return (NULL); 837 838 ctx->pool = pool; 839 ctx->hook_meta = NULL; 840 ctx->evcon = NULL; 841 ctx->name = mm_strdup(rpcname); 842 if (ctx->name == NULL) { 843 mm_free(ctx); 844 return (NULL); 845 } 846 ctx->cb = cb; 847 ctx->cb_arg = cbarg; 848 ctx->request = request; 849 ctx->reply = reply; 850 ctx->request_marshal = req_marshal; 851 ctx->reply_clear = rpl_clear; 852 ctx->reply_unmarshal = rpl_unmarshal; 853 854 return (ctx); 855 } 856 857 static void 858 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 859 860 static void 861 evrpc_reply_done(struct evhttp_request *req, void *arg) 862 { 863 struct evrpc_request_wrapper *ctx = arg; 864 struct evrpc_pool *pool = ctx->pool; 865 int hook_res = EVRPC_CONTINUE; 866 867 /* cancel any timeout we might have scheduled */ 868 event_del(&ctx->ev_timeout); 869 870 ctx->req = req; 871 872 /* we need to get the reply now */ 873 if (req == NULL) { 874 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 875 return; 876 } 877 878 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 879 evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 880 881 /* apply hooks to the incoming request */ 882 hook_res = evrpc_process_hooks(&pool->input_hooks, 883 ctx, req, req->input_buffer); 884 885 switch (hook_res) { 886 case EVRPC_TERMINATE: 887 case EVRPC_CONTINUE: 888 break; 889 case EVRPC_PAUSE: 890 /* 891 * if we get paused we also need to know the 892 * request. unfortunately, the underlying 893 * layer is going to free it. we need to 894 * request ownership explicitly 895 */ 896 if (req != NULL) 897 evhttp_request_own(req); 898 899 evrpc_pause_request(pool, ctx, 900 evrpc_reply_done_closure); 901 return; 902 default: 903 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 904 hook_res == EVRPC_CONTINUE || 905 hook_res == EVRPC_PAUSE); 906 } 907 } 908 909 evrpc_reply_done_closure(ctx, hook_res); 910 911 /* http request is being freed by underlying layer */ 912 } 913 914 static void 915 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 916 { 917 struct evrpc_request_wrapper *ctx = arg; 918 struct evhttp_request *req = ctx->req; 919 struct evrpc_pool *pool = ctx->pool; 920 struct evrpc_status status; 921 int res = -1; 922 923 memset(&status, 0, sizeof(status)); 924 status.http_req = req; 925 926 /* we need to get the reply now */ 927 if (req == NULL) { 928 status.error = EVRPC_STATUS_ERR_TIMEOUT; 929 } else if (hook_res == EVRPC_TERMINATE) { 930 status.error = EVRPC_STATUS_ERR_HOOKABORTED; 931 } else { 932 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 933 if (res == -1) 934 status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 935 } 936 937 if (res == -1) { 938 /* clear everything that we might have written previously */ 939 ctx->reply_clear(ctx->reply); 940 } 941 942 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 943 944 evrpc_request_wrapper_free(ctx); 945 946 /* the http layer owned the original request structure, but if we 947 * got paused, we asked for ownership and need to free it here. */ 948 if (req != NULL && evhttp_request_is_owned(req)) 949 evhttp_request_free(req); 950 951 /* see if we can schedule another request */ 952 evrpc_pool_schedule(pool); 953 } 954 955 static void 956 evrpc_pool_schedule(struct evrpc_pool *pool) 957 { 958 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 959 struct evhttp_connection *evcon; 960 961 /* if no requests are pending, we have no work */ 962 if (ctx == NULL) 963 return; 964 965 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 966 TAILQ_REMOVE(&pool->requests, ctx, next); 967 evrpc_schedule_request(evcon, ctx); 968 } 969 } 970 971 static void 972 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 973 { 974 struct evrpc_request_wrapper *ctx = arg; 975 struct evhttp_connection *evcon = ctx->evcon; 976 EVUTIL_ASSERT(evcon != NULL); 977 978 evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 979 } 980 981 /* 982 * frees potential meta data associated with a request. 983 */ 984 985 static void 986 evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 987 { 988 struct evrpc_meta *entry; 989 EVUTIL_ASSERT(meta_data != NULL); 990 991 while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 992 TAILQ_REMOVE(meta_data, entry, next); 993 mm_free(entry->key); 994 mm_free(entry->data); 995 mm_free(entry); 996 } 997 } 998 999 static struct evrpc_hook_meta * 1000 evrpc_hook_meta_new_(void) 1001 { 1002 struct evrpc_hook_meta *ctx; 1003 ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1004 EVUTIL_ASSERT(ctx != NULL); 1005 1006 TAILQ_INIT(&ctx->meta_data); 1007 ctx->evcon = NULL; 1008 1009 return (ctx); 1010 } 1011 1012 static void 1013 evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1014 struct evhttp_connection *evcon) 1015 { 1016 struct evrpc_hook_meta *ctx = *pctx; 1017 if (ctx == NULL) 1018 *pctx = ctx = evrpc_hook_meta_new_(); 1019 ctx->evcon = evcon; 1020 } 1021 1022 static void 1023 evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1024 { 1025 evrpc_meta_data_free(&ctx->meta_data); 1026 mm_free(ctx); 1027 } 1028 1029 /* Adds meta data */ 1030 void 1031 evrpc_hook_add_meta(void *ctx, const char *key, 1032 const void *data, size_t data_size) 1033 { 1034 struct evrpc_request_wrapper *req = ctx; 1035 struct evrpc_hook_meta *store = NULL; 1036 struct evrpc_meta *meta = NULL; 1037 1038 if ((store = req->hook_meta) == NULL) 1039 store = req->hook_meta = evrpc_hook_meta_new_(); 1040 1041 meta = mm_malloc(sizeof(struct evrpc_meta)); 1042 EVUTIL_ASSERT(meta != NULL); 1043 meta->key = mm_strdup(key); 1044 EVUTIL_ASSERT(meta->key != NULL); 1045 meta->data_size = data_size; 1046 meta->data = mm_malloc(data_size); 1047 EVUTIL_ASSERT(meta->data != NULL); 1048 memcpy(meta->data, data, data_size); 1049 1050 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1051 } 1052 1053 int 1054 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1055 { 1056 struct evrpc_request_wrapper *req = ctx; 1057 struct evrpc_meta *meta = NULL; 1058 1059 if (req->hook_meta == NULL) 1060 return (-1); 1061 1062 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1063 if (strcmp(meta->key, key) == 0) { 1064 *data = meta->data; 1065 *data_size = meta->data_size; 1066 return (0); 1067 } 1068 } 1069 1070 return (-1); 1071 } 1072 1073 struct evhttp_connection * 1074 evrpc_hook_get_connection(void *ctx) 1075 { 1076 struct evrpc_request_wrapper *req = ctx; 1077 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1078 } 1079 1080 int 1081 evrpc_send_request_generic(struct evrpc_pool *pool, 1082 void *request, void *reply, 1083 void (*cb)(struct evrpc_status *, void *, void *, void *), 1084 void *cb_arg, 1085 const char *rpcname, 1086 void (*req_marshal)(struct evbuffer *, void *), 1087 void (*rpl_clear)(void *), 1088 int (*rpl_unmarshal)(void *, struct evbuffer *)) 1089 { 1090 struct evrpc_status status; 1091 struct evrpc_request_wrapper *ctx; 1092 ctx = evrpc_make_request_ctx(pool, request, reply, 1093 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1094 if (ctx == NULL) 1095 goto error; 1096 return (evrpc_make_request(ctx)); 1097 error: 1098 memset(&status, 0, sizeof(status)); 1099 status.error = EVRPC_STATUS_ERR_UNSTARTED; 1100 (*(cb))(&status, request, reply, cb_arg); 1101 return (-1); 1102 } 1103 1104 /** Takes a request object and fills it in with the right magic */ 1105 static struct evrpc * 1106 evrpc_register_object(const char *name, 1107 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1108 int (*req_unmarshal)(void *, struct evbuffer *), 1109 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1110 int (*rpl_complete)(void *), 1111 void (*rpl_marshal)(struct evbuffer *, void *)) 1112 { 1113 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1114 if (rpc == NULL) 1115 return (NULL); 1116 rpc->uri = mm_strdup(name); 1117 if (rpc->uri == NULL) { 1118 mm_free(rpc); 1119 return (NULL); 1120 } 1121 rpc->request_new = req_new; 1122 rpc->request_new_arg = req_new_arg; 1123 rpc->request_free = req_free; 1124 rpc->request_unmarshal = req_unmarshal; 1125 rpc->reply_new = rpl_new; 1126 rpc->reply_new_arg = rpl_new_arg; 1127 rpc->reply_free = rpl_free; 1128 rpc->reply_complete = rpl_complete; 1129 rpc->reply_marshal = rpl_marshal; 1130 return (rpc); 1131 } 1132 1133 int 1134 evrpc_register_generic(struct evrpc_base *base, const char *name, 1135 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1136 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1137 int (*req_unmarshal)(void *, struct evbuffer *), 1138 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1139 int (*rpl_complete)(void *), 1140 void (*rpl_marshal)(struct evbuffer *, void *)) 1141 { 1142 struct evrpc* rpc = 1143 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1144 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1145 if (rpc == NULL) 1146 return (-1); 1147 evrpc_register_rpc(base, rpc, 1148 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1149 return (0); 1150 } 1151 1152 /** accessors for obscure and undocumented functionality */ 1153 struct evrpc_pool * 1154 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1155 { 1156 return (ctx->pool); 1157 } 1158 1159 void 1160 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1161 struct evrpc_pool *pool) 1162 { 1163 ctx->pool = pool; 1164 } 1165 1166 void 1167 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1168 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1169 void *cb_arg) 1170 { 1171 ctx->cb = cb; 1172 ctx->cb_arg = cb_arg; 1173 } 1174