1 /* $NetBSD: evrpc.c,v 1.4 2017/01/31 23:17:39 christos Exp $ */ 2 /* 3 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 #include "event2/event-config.h" 29 #include <sys/cdefs.h> 30 __RCSID("$NetBSD: evrpc.c,v 1.4 2017/01/31 23:17:39 christos Exp $"); 31 #include "evconfig-private.h" 32 33 #ifdef _WIN32 34 #define WIN32_LEAN_AND_MEAN 35 #include <winsock2.h> 36 #include <windows.h> 37 #undef WIN32_LEAN_AND_MEAN 38 #endif 39 40 #include <sys/types.h> 41 #ifndef _WIN32 42 #include <sys/socket.h> 43 #endif 44 #ifdef EVENT__HAVE_SYS_TIME_H 45 #include <sys/time.h> 46 #endif 47 #include <sys/queue.h> 48 #include <stdio.h> 49 #include <stdlib.h> 50 #ifndef _WIN32 51 #include <unistd.h> 52 #endif 53 #include <errno.h> 54 #include <signal.h> 55 #include <string.h> 56 57 #include <sys/queue.h> 58 59 #include "event2/event.h" 60 #include "event2/event_struct.h" 61 #include "event2/rpc.h" 62 #include "event2/rpc_struct.h" 63 #include "evrpc-internal.h" 64 #include "event2/http.h" 65 #include "event2/buffer.h" 66 #include "event2/tag.h" 67 #include "event2/http_struct.h" 68 #include "event2/http_compat.h" 69 #include "event2/util.h" 70 #include "util-internal.h" 71 #include "log-internal.h" 72 #include "mm-internal.h" 73 74 struct evrpc_base * 75 evrpc_init(struct evhttp *http_server) 76 { 77 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 78 if (base == NULL) 79 return (NULL); 80 81 /* we rely on the tagging sub system */ 82 evtag_init(); 83 84 TAILQ_INIT(&base->registered_rpcs); 85 TAILQ_INIT(&base->input_hooks); 86 TAILQ_INIT(&base->output_hooks); 87 88 TAILQ_INIT(&base->paused_requests); 89 90 base->http_server = http_server; 91 92 return (base); 93 } 94 95 void 96 evrpc_free(struct evrpc_base *base) 97 { 98 struct evrpc *rpc; 99 struct evrpc_hook *hook; 100 struct evrpc_hook_ctx *pause; 101 int r; 102 103 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 104 r = evrpc_unregister_rpc(base, rpc->uri); 105 EVUTIL_ASSERT(r == 0); 106 } 107 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 108 TAILQ_REMOVE(&base->paused_requests, pause, next); 109 mm_free(pause); 110 } 111 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 112 r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 113 EVUTIL_ASSERT(r); 114 } 115 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 116 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 117 EVUTIL_ASSERT(r); 118 } 119 mm_free(base); 120 } 121 122 void * 123 evrpc_add_hook(void *vbase, 124 enum EVRPC_HOOK_TYPE hook_type, 125 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 126 void *cb_arg) 127 { 128 struct evrpc_hooks_ *base = vbase; 129 struct evrpc_hook_list *head = NULL; 130 struct evrpc_hook *hook = NULL; 131 switch (hook_type) { 132 case EVRPC_INPUT: 133 head = &base->in_hooks; 134 break; 135 case EVRPC_OUTPUT: 136 head = &base->out_hooks; 137 break; 138 default: 139 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 140 } 141 142 hook = mm_calloc(1, sizeof(struct evrpc_hook)); 143 EVUTIL_ASSERT(hook != NULL); 144 145 hook->process = cb; 146 hook->process_arg = cb_arg; 147 TAILQ_INSERT_TAIL(head, hook, next); 148 149 return (hook); 150 } 151 152 static int 153 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 154 { 155 struct evrpc_hook *hook = NULL; 156 TAILQ_FOREACH(hook, head, next) { 157 if (hook == handle) { 158 TAILQ_REMOVE(head, hook, next); 159 mm_free(hook); 160 return (1); 161 } 162 } 163 164 return (0); 165 } 166 167 /* 168 * remove the hook specified by the handle 169 */ 170 171 int 172 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 173 { 174 struct evrpc_hooks_ *base = vbase; 175 struct evrpc_hook_list *head = NULL; 176 switch (hook_type) { 177 case EVRPC_INPUT: 178 head = &base->in_hooks; 179 break; 180 case EVRPC_OUTPUT: 181 head = &base->out_hooks; 182 break; 183 default: 184 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 185 } 186 187 return (evrpc_remove_hook_internal(head, handle)); 188 } 189 190 static int 191 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 192 struct evhttp_request *req, struct evbuffer *evbuf) 193 { 194 struct evrpc_hook *hook; 195 TAILQ_FOREACH(hook, head, next) { 196 int res = hook->process(ctx, req, evbuf, hook->process_arg); 197 if (res != EVRPC_CONTINUE) 198 return (res); 199 } 200 201 return (EVRPC_CONTINUE); 202 } 203 204 static void evrpc_pool_schedule(struct evrpc_pool *pool); 205 static void evrpc_request_cb(struct evhttp_request *, void *); 206 207 /* 208 * Registers a new RPC with the HTTP server. The evrpc object is expected 209 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 210 * calls this function. 211 */ 212 213 static char * 214 evrpc_construct_uri(const char *uri) 215 { 216 char *constructed_uri; 217 size_t constructed_uri_len; 218 219 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 220 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 221 event_err(1, "%s: failed to register rpc at %s", 222 __func__, uri); 223 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 224 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 225 constructed_uri[constructed_uri_len - 1] = '\0'; 226 227 return (constructed_uri); 228 } 229 230 int 231 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 232 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 233 { 234 char *constructed_uri = evrpc_construct_uri(rpc->uri); 235 236 rpc->base = base; 237 rpc->cb = cb; 238 rpc->cb_arg = cb_arg; 239 240 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 241 242 evhttp_set_cb(base->http_server, 243 constructed_uri, 244 evrpc_request_cb, 245 rpc); 246 247 mm_free(constructed_uri); 248 249 return (0); 250 } 251 252 int 253 evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 254 { 255 char *registered_uri = NULL; 256 struct evrpc *rpc; 257 int r; 258 259 /* find the right rpc; linear search might be slow */ 260 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 261 if (strcmp(rpc->uri, name) == 0) 262 break; 263 } 264 if (rpc == NULL) { 265 /* We did not find an RPC with this name */ 266 return (-1); 267 } 268 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 269 270 registered_uri = evrpc_construct_uri(name); 271 272 /* remove the http server callback */ 273 r = evhttp_del_cb(base->http_server, registered_uri); 274 EVUTIL_ASSERT(r == 0); 275 276 mm_free(registered_uri); 277 278 mm_free(__UNCONST(rpc->uri)); 279 mm_free(rpc); 280 return (0); 281 } 282 283 static int evrpc_pause_request(void *vbase, void *ctx, 284 void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 285 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 286 287 static void 288 evrpc_request_cb(struct evhttp_request *req, void *arg) 289 { 290 struct evrpc *rpc = arg; 291 struct evrpc_req_generic *rpc_state = NULL; 292 293 /* let's verify the outside parameters */ 294 if (req->type != EVHTTP_REQ_POST || 295 evbuffer_get_length(req->input_buffer) <= 0) 296 goto error; 297 298 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 299 if (rpc_state == NULL) 300 goto error; 301 rpc_state->rpc = rpc; 302 rpc_state->http_req = req; 303 rpc_state->rpc_data = NULL; 304 305 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 306 int hook_res; 307 308 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 309 310 /* 311 * allow hooks to modify the outgoing request 312 */ 313 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 314 rpc_state, req, req->input_buffer); 315 switch (hook_res) { 316 case EVRPC_TERMINATE: 317 goto error; 318 case EVRPC_PAUSE: 319 evrpc_pause_request(rpc->base, rpc_state, 320 evrpc_request_cb_closure); 321 return; 322 case EVRPC_CONTINUE: 323 break; 324 default: 325 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 326 hook_res == EVRPC_CONTINUE || 327 hook_res == EVRPC_PAUSE); 328 } 329 } 330 331 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 332 return; 333 334 error: 335 evrpc_reqstate_free_(rpc_state); 336 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 337 return; 338 } 339 340 static void 341 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 342 { 343 struct evrpc_req_generic *rpc_state = arg; 344 struct evrpc *rpc; 345 struct evhttp_request *req; 346 347 EVUTIL_ASSERT(rpc_state); 348 rpc = rpc_state->rpc; 349 req = rpc_state->http_req; 350 351 if (hook_res == EVRPC_TERMINATE) 352 goto error; 353 354 /* let's check that we can parse the request */ 355 rpc_state->request = rpc->request_new(rpc->request_new_arg); 356 if (rpc_state->request == NULL) 357 goto error; 358 359 if (rpc->request_unmarshal( 360 rpc_state->request, req->input_buffer) == -1) { 361 /* we failed to parse the request; that's a bummer */ 362 goto error; 363 } 364 365 /* at this point, we have a well formed request, prepare the reply */ 366 367 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 368 if (rpc_state->reply == NULL) 369 goto error; 370 371 /* give the rpc to the user; they can deal with it */ 372 rpc->cb(rpc_state, rpc->cb_arg); 373 374 return; 375 376 error: 377 evrpc_reqstate_free_(rpc_state); 378 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 379 return; 380 } 381 382 383 void 384 evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 385 { 386 struct evrpc *rpc; 387 EVUTIL_ASSERT(rpc_state != NULL); 388 rpc = rpc_state->rpc; 389 390 /* clean up all memory */ 391 if (rpc_state->hook_meta != NULL) 392 evrpc_hook_context_free_(rpc_state->hook_meta); 393 if (rpc_state->request != NULL) 394 rpc->request_free(rpc_state->request); 395 if (rpc_state->reply != NULL) 396 rpc->reply_free(rpc_state->reply); 397 if (rpc_state->rpc_data != NULL) 398 evbuffer_free(rpc_state->rpc_data); 399 mm_free(rpc_state); 400 } 401 402 static void 403 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 404 405 void 406 evrpc_request_done(struct evrpc_req_generic *rpc_state) 407 { 408 struct evhttp_request *req; 409 struct evrpc *rpc; 410 411 EVUTIL_ASSERT(rpc_state); 412 413 req = rpc_state->http_req; 414 rpc = rpc_state->rpc; 415 416 if (rpc->reply_complete(rpc_state->reply) == -1) { 417 /* the reply was not completely filled in. error out */ 418 goto error; 419 } 420 421 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 422 /* out of memory */ 423 goto error; 424 } 425 426 /* serialize the reply */ 427 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 428 429 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 430 int hook_res; 431 432 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 433 434 /* do hook based tweaks to the request */ 435 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 436 rpc_state, req, rpc_state->rpc_data); 437 switch (hook_res) { 438 case EVRPC_TERMINATE: 439 goto error; 440 case EVRPC_PAUSE: 441 if (evrpc_pause_request(rpc->base, rpc_state, 442 evrpc_request_done_closure) == -1) 443 goto error; 444 return; 445 case EVRPC_CONTINUE: 446 break; 447 default: 448 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 449 hook_res == EVRPC_CONTINUE || 450 hook_res == EVRPC_PAUSE); 451 } 452 } 453 454 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 455 return; 456 457 error: 458 evrpc_reqstate_free_(rpc_state); 459 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 460 return; 461 } 462 463 void * 464 evrpc_get_request(struct evrpc_req_generic *req) 465 { 466 return req->request; 467 } 468 469 void * 470 evrpc_get_reply(struct evrpc_req_generic *req) 471 { 472 return req->reply; 473 } 474 475 static void 476 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 477 { 478 struct evrpc_req_generic *rpc_state = arg; 479 struct evhttp_request *req; 480 EVUTIL_ASSERT(rpc_state); 481 req = rpc_state->http_req; 482 483 if (hook_res == EVRPC_TERMINATE) 484 goto error; 485 486 /* on success, we are going to transmit marshaled binary data */ 487 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 488 evhttp_add_header(req->output_headers, 489 "Content-Type", "application/octet-stream"); 490 } 491 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 492 493 evrpc_reqstate_free_(rpc_state); 494 495 return; 496 497 error: 498 evrpc_reqstate_free_(rpc_state); 499 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 500 return; 501 } 502 503 504 /* Client implementation of RPC site */ 505 506 static int evrpc_schedule_request(struct evhttp_connection *connection, 507 struct evrpc_request_wrapper *ctx); 508 509 struct evrpc_pool * 510 evrpc_pool_new(struct event_base *base) 511 { 512 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 513 if (pool == NULL) 514 return (NULL); 515 516 TAILQ_INIT(&pool->connections); 517 TAILQ_INIT(&pool->requests); 518 519 TAILQ_INIT(&pool->paused_requests); 520 521 TAILQ_INIT(&pool->input_hooks); 522 TAILQ_INIT(&pool->output_hooks); 523 524 pool->base = base; 525 pool->timeout = -1; 526 527 return (pool); 528 } 529 530 static void 531 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 532 { 533 if (request->hook_meta != NULL) 534 evrpc_hook_context_free_(request->hook_meta); 535 mm_free(request->name); 536 mm_free(request); 537 } 538 539 void 540 evrpc_pool_free(struct evrpc_pool *pool) 541 { 542 struct evhttp_connection *connection; 543 struct evrpc_request_wrapper *request; 544 struct evrpc_hook_ctx *pause; 545 struct evrpc_hook *hook; 546 int r; 547 548 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 549 TAILQ_REMOVE(&pool->requests, request, next); 550 evrpc_request_wrapper_free(request); 551 } 552 553 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 554 TAILQ_REMOVE(&pool->paused_requests, pause, next); 555 mm_free(pause); 556 } 557 558 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 559 TAILQ_REMOVE(&pool->connections, connection, next); 560 evhttp_connection_free(connection); 561 } 562 563 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 564 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 565 EVUTIL_ASSERT(r); 566 } 567 568 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 569 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 570 EVUTIL_ASSERT(r); 571 } 572 573 mm_free(pool); 574 } 575 576 /* 577 * Add a connection to the RPC pool. A request scheduled on the pool 578 * may use any available connection. 579 */ 580 581 void 582 evrpc_pool_add_connection(struct evrpc_pool *pool, 583 struct evhttp_connection *connection) 584 { 585 EVUTIL_ASSERT(connection->http_server == NULL); 586 TAILQ_INSERT_TAIL(&pool->connections, connection, next); 587 588 /* 589 * associate an event base with this connection 590 */ 591 if (pool->base != NULL) 592 evhttp_connection_set_base(connection, pool->base); 593 594 /* 595 * unless a timeout was specifically set for a connection, 596 * the connection inherits the timeout from the pool. 597 */ 598 if (!evutil_timerisset(&connection->timeout)) 599 evhttp_connection_set_timeout(connection, pool->timeout); 600 601 /* 602 * if we have any requests pending, schedule them with the new 603 * connections. 604 */ 605 606 if (TAILQ_FIRST(&pool->requests) != NULL) { 607 struct evrpc_request_wrapper *request = 608 TAILQ_FIRST(&pool->requests); 609 TAILQ_REMOVE(&pool->requests, request, next); 610 evrpc_schedule_request(connection, request); 611 } 612 } 613 614 void 615 evrpc_pool_remove_connection(struct evrpc_pool *pool, 616 struct evhttp_connection *connection) 617 { 618 TAILQ_REMOVE(&pool->connections, connection, next); 619 } 620 621 void 622 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 623 { 624 struct evhttp_connection *evcon; 625 TAILQ_FOREACH(evcon, &pool->connections, next) { 626 evhttp_connection_set_timeout(evcon, timeout_in_secs); 627 } 628 pool->timeout = timeout_in_secs; 629 } 630 631 632 static void evrpc_reply_done(struct evhttp_request *, void *); 633 static void evrpc_request_timeout(evutil_socket_t, short, void *); 634 635 /* 636 * Finds a connection object associated with the pool that is currently 637 * idle and can be used to make a request. 638 */ 639 static struct evhttp_connection * 640 evrpc_pool_find_connection(struct evrpc_pool *pool) 641 { 642 struct evhttp_connection *connection; 643 TAILQ_FOREACH(connection, &pool->connections, next) { 644 if (TAILQ_FIRST(&connection->requests) == NULL) 645 return (connection); 646 } 647 648 return (NULL); 649 } 650 651 /* 652 * Prototypes responsible for evrpc scheduling and hooking 653 */ 654 655 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 656 657 /* 658 * We assume that the ctx is no longer queued on the pool. 659 */ 660 static int 661 evrpc_schedule_request(struct evhttp_connection *connection, 662 struct evrpc_request_wrapper *ctx) 663 { 664 struct evhttp_request *req = NULL; 665 struct evrpc_pool *pool = ctx->pool; 666 struct evrpc_status status; 667 668 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 669 goto error; 670 671 /* serialize the request data into the output buffer */ 672 ctx->request_marshal(req->output_buffer, ctx->request); 673 674 /* we need to know the connection that we might have to abort */ 675 ctx->evcon = connection; 676 677 /* if we get paused we also need to know the request */ 678 ctx->req = req; 679 680 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 681 int hook_res; 682 683 evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 684 685 /* apply hooks to the outgoing request */ 686 hook_res = evrpc_process_hooks(&pool->output_hooks, 687 ctx, req, req->output_buffer); 688 689 switch (hook_res) { 690 case EVRPC_TERMINATE: 691 goto error; 692 case EVRPC_PAUSE: 693 /* we need to be explicitly resumed */ 694 if (evrpc_pause_request(pool, ctx, 695 evrpc_schedule_request_closure) == -1) 696 goto error; 697 return (0); 698 case EVRPC_CONTINUE: 699 /* we can just continue */ 700 break; 701 default: 702 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 703 hook_res == EVRPC_CONTINUE || 704 hook_res == EVRPC_PAUSE); 705 } 706 } 707 708 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 709 return (0); 710 711 error: 712 memset(&status, 0, sizeof(status)); 713 status.error = EVRPC_STATUS_ERR_UNSTARTED; 714 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 715 evrpc_request_wrapper_free(ctx); 716 return (-1); 717 } 718 719 static void 720 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 721 { 722 struct evrpc_request_wrapper *ctx = arg; 723 struct evhttp_connection *connection = ctx->evcon; 724 struct evhttp_request *req = ctx->req; 725 struct evrpc_pool *pool = ctx->pool; 726 struct evrpc_status status; 727 char *uri = NULL; 728 int res = 0; 729 730 if (hook_res == EVRPC_TERMINATE) 731 goto error; 732 733 uri = evrpc_construct_uri(ctx->name); 734 if (uri == NULL) 735 goto error; 736 737 if (pool->timeout > 0) { 738 /* 739 * a timeout after which the whole rpc is going to be aborted. 740 */ 741 struct timeval tv; 742 evutil_timerclear(&tv); 743 tv.tv_sec = pool->timeout; 744 evtimer_add(&ctx->ev_timeout, &tv); 745 } 746 747 /* start the request over the connection */ 748 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 749 mm_free(uri); 750 751 if (res == -1) 752 goto error; 753 754 return; 755 756 error: 757 memset(&status, 0, sizeof(status)); 758 status.error = EVRPC_STATUS_ERR_UNSTARTED; 759 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 760 evrpc_request_wrapper_free(ctx); 761 } 762 763 /* we just queue the paused request on the pool under the req object */ 764 static int 765 evrpc_pause_request(void *vbase, void *ctx, 766 void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 767 { 768 struct evrpc_hooks_ *base = vbase; 769 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 770 if (pause == NULL) 771 return (-1); 772 773 pause->ctx = ctx; 774 pause->cb = cb; 775 776 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 777 return (0); 778 } 779 780 int 781 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 782 { 783 struct evrpc_hooks_ *base = vbase; 784 struct evrpc_pause_list *head = &base->pause_requests; 785 struct evrpc_hook_ctx *pause; 786 787 TAILQ_FOREACH(pause, head, next) { 788 if (pause->ctx == ctx) 789 break; 790 } 791 792 if (pause == NULL) 793 return (-1); 794 795 (*pause->cb)(pause->ctx, res); 796 TAILQ_REMOVE(head, pause, next); 797 mm_free(pause); 798 return (0); 799 } 800 801 int 802 evrpc_make_request(struct evrpc_request_wrapper *ctx) 803 { 804 struct evrpc_pool *pool = ctx->pool; 805 806 /* initialize the event structure for this rpc */ 807 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 808 809 /* we better have some available connections on the pool */ 810 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 811 812 /* 813 * if no connection is available, we queue the request on the pool, 814 * the next time a connection is empty, the rpc will be send on that. 815 */ 816 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 817 818 evrpc_pool_schedule(pool); 819 820 return (0); 821 } 822 823 824 struct evrpc_request_wrapper * 825 evrpc_make_request_ctx( 826 struct evrpc_pool *pool, void *request, void *reply, 827 const char *rpcname, 828 void (*req_marshal)(struct evbuffer*, void *), 829 void (*rpl_clear)(void *), 830 int (*rpl_unmarshal)(void *, struct evbuffer *), 831 void (*cb)(struct evrpc_status *, void *, void *, void *), 832 void *cbarg) 833 { 834 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 835 mm_malloc(sizeof(struct evrpc_request_wrapper)); 836 if (ctx == NULL) 837 return (NULL); 838 839 ctx->pool = pool; 840 ctx->hook_meta = NULL; 841 ctx->evcon = NULL; 842 ctx->name = mm_strdup(rpcname); 843 if (ctx->name == NULL) { 844 mm_free(ctx); 845 return (NULL); 846 } 847 ctx->cb = cb; 848 ctx->cb_arg = cbarg; 849 ctx->request = request; 850 ctx->reply = reply; 851 ctx->request_marshal = req_marshal; 852 ctx->reply_clear = rpl_clear; 853 ctx->reply_unmarshal = rpl_unmarshal; 854 855 return (ctx); 856 } 857 858 static void 859 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 860 861 static void 862 evrpc_reply_done(struct evhttp_request *req, void *arg) 863 { 864 struct evrpc_request_wrapper *ctx = arg; 865 struct evrpc_pool *pool = ctx->pool; 866 int hook_res = EVRPC_CONTINUE; 867 868 /* cancel any timeout we might have scheduled */ 869 event_del(&ctx->ev_timeout); 870 871 ctx->req = req; 872 873 /* we need to get the reply now */ 874 if (req == NULL) { 875 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 876 return; 877 } 878 879 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 880 evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 881 882 /* apply hooks to the incoming request */ 883 hook_res = evrpc_process_hooks(&pool->input_hooks, 884 ctx, req, req->input_buffer); 885 886 switch (hook_res) { 887 case EVRPC_TERMINATE: 888 case EVRPC_CONTINUE: 889 break; 890 case EVRPC_PAUSE: 891 /* 892 * if we get paused we also need to know the 893 * request. unfortunately, the underlying 894 * layer is going to free it. we need to 895 * request ownership explicitly 896 */ 897 if (req != NULL) 898 evhttp_request_own(req); 899 900 evrpc_pause_request(pool, ctx, 901 evrpc_reply_done_closure); 902 return; 903 default: 904 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 905 hook_res == EVRPC_CONTINUE || 906 hook_res == EVRPC_PAUSE); 907 } 908 } 909 910 evrpc_reply_done_closure(ctx, hook_res); 911 912 /* http request is being freed by underlying layer */ 913 } 914 915 static void 916 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 917 { 918 struct evrpc_request_wrapper *ctx = arg; 919 struct evhttp_request *req = ctx->req; 920 struct evrpc_pool *pool = ctx->pool; 921 struct evrpc_status status; 922 int res = -1; 923 924 memset(&status, 0, sizeof(status)); 925 status.http_req = req; 926 927 /* we need to get the reply now */ 928 if (req == NULL) { 929 status.error = EVRPC_STATUS_ERR_TIMEOUT; 930 } else if (hook_res == EVRPC_TERMINATE) { 931 status.error = EVRPC_STATUS_ERR_HOOKABORTED; 932 } else { 933 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 934 if (res == -1) 935 status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 936 } 937 938 if (res == -1) { 939 /* clear everything that we might have written previously */ 940 ctx->reply_clear(ctx->reply); 941 } 942 943 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 944 945 evrpc_request_wrapper_free(ctx); 946 947 /* the http layer owned the original request structure, but if we 948 * got paused, we asked for ownership and need to free it here. */ 949 if (req != NULL && evhttp_request_is_owned(req)) 950 evhttp_request_free(req); 951 952 /* see if we can schedule another request */ 953 evrpc_pool_schedule(pool); 954 } 955 956 static void 957 evrpc_pool_schedule(struct evrpc_pool *pool) 958 { 959 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 960 struct evhttp_connection *evcon; 961 962 /* if no requests are pending, we have no work */ 963 if (ctx == NULL) 964 return; 965 966 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 967 TAILQ_REMOVE(&pool->requests, ctx, next); 968 evrpc_schedule_request(evcon, ctx); 969 } 970 } 971 972 static void 973 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 974 { 975 struct evrpc_request_wrapper *ctx = arg; 976 struct evhttp_connection *evcon = ctx->evcon; 977 EVUTIL_ASSERT(evcon != NULL); 978 979 evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 980 } 981 982 /* 983 * frees potential meta data associated with a request. 984 */ 985 986 static void 987 evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 988 { 989 struct evrpc_meta *entry; 990 EVUTIL_ASSERT(meta_data != NULL); 991 992 while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 993 TAILQ_REMOVE(meta_data, entry, next); 994 mm_free(entry->key); 995 mm_free(entry->data); 996 mm_free(entry); 997 } 998 } 999 1000 static struct evrpc_hook_meta * 1001 evrpc_hook_meta_new_(void) 1002 { 1003 struct evrpc_hook_meta *ctx; 1004 ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1005 EVUTIL_ASSERT(ctx != NULL); 1006 1007 TAILQ_INIT(&ctx->meta_data); 1008 ctx->evcon = NULL; 1009 1010 return (ctx); 1011 } 1012 1013 static void 1014 evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1015 struct evhttp_connection *evcon) 1016 { 1017 struct evrpc_hook_meta *ctx = *pctx; 1018 if (ctx == NULL) 1019 *pctx = ctx = evrpc_hook_meta_new_(); 1020 ctx->evcon = evcon; 1021 } 1022 1023 static void 1024 evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1025 { 1026 evrpc_meta_data_free(&ctx->meta_data); 1027 mm_free(ctx); 1028 } 1029 1030 /* Adds meta data */ 1031 void 1032 evrpc_hook_add_meta(void *ctx, const char *key, 1033 const void *data, size_t data_size) 1034 { 1035 struct evrpc_request_wrapper *req = ctx; 1036 struct evrpc_hook_meta *store = NULL; 1037 struct evrpc_meta *meta = NULL; 1038 1039 if ((store = req->hook_meta) == NULL) 1040 store = req->hook_meta = evrpc_hook_meta_new_(); 1041 1042 meta = mm_malloc(sizeof(struct evrpc_meta)); 1043 EVUTIL_ASSERT(meta != NULL); 1044 meta->key = mm_strdup(key); 1045 EVUTIL_ASSERT(meta->key != NULL); 1046 meta->data_size = data_size; 1047 meta->data = mm_malloc(data_size); 1048 EVUTIL_ASSERT(meta->data != NULL); 1049 memcpy(meta->data, data, data_size); 1050 1051 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1052 } 1053 1054 int 1055 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1056 { 1057 struct evrpc_request_wrapper *req = ctx; 1058 struct evrpc_meta *meta = NULL; 1059 1060 if (req->hook_meta == NULL) 1061 return (-1); 1062 1063 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1064 if (strcmp(meta->key, key) == 0) { 1065 *data = meta->data; 1066 *data_size = meta->data_size; 1067 return (0); 1068 } 1069 } 1070 1071 return (-1); 1072 } 1073 1074 struct evhttp_connection * 1075 evrpc_hook_get_connection(void *ctx) 1076 { 1077 struct evrpc_request_wrapper *req = ctx; 1078 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1079 } 1080 1081 int 1082 evrpc_send_request_generic(struct evrpc_pool *pool, 1083 void *request, void *reply, 1084 void (*cb)(struct evrpc_status *, void *, void *, void *), 1085 void *cb_arg, 1086 const char *rpcname, 1087 void (*req_marshal)(struct evbuffer *, void *), 1088 void (*rpl_clear)(void *), 1089 int (*rpl_unmarshal)(void *, struct evbuffer *)) 1090 { 1091 struct evrpc_status status; 1092 struct evrpc_request_wrapper *ctx; 1093 ctx = evrpc_make_request_ctx(pool, request, reply, 1094 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1095 if (ctx == NULL) 1096 goto error; 1097 return (evrpc_make_request(ctx)); 1098 error: 1099 memset(&status, 0, sizeof(status)); 1100 status.error = EVRPC_STATUS_ERR_UNSTARTED; 1101 (*(cb))(&status, request, reply, cb_arg); 1102 return (-1); 1103 } 1104 1105 /** Takes a request object and fills it in with the right magic */ 1106 static struct evrpc * 1107 evrpc_register_object(const char *name, 1108 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1109 int (*req_unmarshal)(void *, struct evbuffer *), 1110 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1111 int (*rpl_complete)(void *), 1112 void (*rpl_marshal)(struct evbuffer *, void *)) 1113 { 1114 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1115 if (rpc == NULL) 1116 return (NULL); 1117 rpc->uri = mm_strdup(name); 1118 if (rpc->uri == NULL) { 1119 mm_free(rpc); 1120 return (NULL); 1121 } 1122 rpc->request_new = req_new; 1123 rpc->request_new_arg = req_new_arg; 1124 rpc->request_free = req_free; 1125 rpc->request_unmarshal = req_unmarshal; 1126 rpc->reply_new = rpl_new; 1127 rpc->reply_new_arg = rpl_new_arg; 1128 rpc->reply_free = rpl_free; 1129 rpc->reply_complete = rpl_complete; 1130 rpc->reply_marshal = rpl_marshal; 1131 return (rpc); 1132 } 1133 1134 int 1135 evrpc_register_generic(struct evrpc_base *base, const char *name, 1136 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1137 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1138 int (*req_unmarshal)(void *, struct evbuffer *), 1139 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1140 int (*rpl_complete)(void *), 1141 void (*rpl_marshal)(struct evbuffer *, void *)) 1142 { 1143 struct evrpc* rpc = 1144 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1145 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1146 if (rpc == NULL) 1147 return (-1); 1148 evrpc_register_rpc(base, rpc, 1149 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1150 return (0); 1151 } 1152 1153 /** accessors for obscure and undocumented functionality */ 1154 struct evrpc_pool * 1155 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1156 { 1157 return (ctx->pool); 1158 } 1159 1160 void 1161 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1162 struct evrpc_pool *pool) 1163 { 1164 ctx->pool = pool; 1165 } 1166 1167 void 1168 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1169 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1170 void *cb_arg) 1171 { 1172 ctx->cb = cb; 1173 ctx->cb_arg = cb_arg; 1174 } 1175