1*eabc0478Schristos /* $NetBSD: evrpc.c,v 1.6 2024/08/18 20:47:21 christos Exp $ */ 28585484eSchristos 38585484eSchristos /* 48585484eSchristos * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 58585484eSchristos * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 68585484eSchristos * 78585484eSchristos * Redistribution and use in source and binary forms, with or without 88585484eSchristos * modification, are permitted provided that the following conditions 98585484eSchristos * are met: 108585484eSchristos * 1. Redistributions of source code must retain the above copyright 118585484eSchristos * notice, this list of conditions and the following disclaimer. 128585484eSchristos * 2. Redistributions in binary form must reproduce the above copyright 138585484eSchristos * notice, this list of conditions and the following disclaimer in the 148585484eSchristos * documentation and/or other materials provided with the distribution. 158585484eSchristos * 3. The name of the author may not be used to endorse or promote products 168585484eSchristos * derived from this software without specific prior written permission. 178585484eSchristos * 188585484eSchristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 198585484eSchristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 208585484eSchristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 218585484eSchristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 228585484eSchristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 238585484eSchristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 248585484eSchristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 258585484eSchristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 268585484eSchristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 278585484eSchristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 288585484eSchristos */ 298585484eSchristos #include "event2/event-config.h" 308585484eSchristos #include "evconfig-private.h" 318585484eSchristos 328585484eSchristos #ifdef _WIN32 338585484eSchristos #define WIN32_LEAN_AND_MEAN 348585484eSchristos #include <winsock2.h> 358585484eSchristos #include <windows.h> 368585484eSchristos #undef WIN32_LEAN_AND_MEAN 378585484eSchristos #endif 388585484eSchristos 398585484eSchristos #include <sys/types.h> 408585484eSchristos #ifndef _WIN32 418585484eSchristos #include <sys/socket.h> 428585484eSchristos #endif 438585484eSchristos #ifdef EVENT__HAVE_SYS_TIME_H 448585484eSchristos #include <sys/time.h> 458585484eSchristos #endif 468585484eSchristos #include <sys/queue.h> 478585484eSchristos #include <stdio.h> 488585484eSchristos #include <stdlib.h> 498585484eSchristos #ifndef _WIN32 508585484eSchristos #include <unistd.h> 518585484eSchristos #endif 528585484eSchristos #include <errno.h> 538585484eSchristos #include <signal.h> 548585484eSchristos #include <string.h> 558585484eSchristos 568585484eSchristos #include <sys/queue.h> 578585484eSchristos 588585484eSchristos #include "event2/event.h" 598585484eSchristos #include "event2/event_struct.h" 608585484eSchristos #include "event2/rpc.h" 618585484eSchristos #include "event2/rpc_struct.h" 628585484eSchristos #include "evrpc-internal.h" 638585484eSchristos #include "event2/http.h" 648585484eSchristos #include "event2/buffer.h" 658585484eSchristos #include "event2/tag.h" 668585484eSchristos #include "event2/http_struct.h" 678585484eSchristos #include "event2/http_compat.h" 688585484eSchristos #include "event2/util.h" 698585484eSchristos #include "util-internal.h" 708585484eSchristos #include "log-internal.h" 718585484eSchristos #include "mm-internal.h" 728585484eSchristos 738585484eSchristos struct evrpc_base * 748585484eSchristos evrpc_init(struct evhttp *http_server) 758585484eSchristos { 768585484eSchristos struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 778585484eSchristos if (base == NULL) 788585484eSchristos return (NULL); 798585484eSchristos 808585484eSchristos /* we rely on the tagging sub system */ 818585484eSchristos evtag_init(); 828585484eSchristos 838585484eSchristos TAILQ_INIT(&base->registered_rpcs); 848585484eSchristos TAILQ_INIT(&base->input_hooks); 858585484eSchristos TAILQ_INIT(&base->output_hooks); 868585484eSchristos 878585484eSchristos TAILQ_INIT(&base->paused_requests); 888585484eSchristos 898585484eSchristos base->http_server = http_server; 908585484eSchristos 918585484eSchristos return (base); 928585484eSchristos } 938585484eSchristos 948585484eSchristos void 958585484eSchristos evrpc_free(struct evrpc_base *base) 968585484eSchristos { 978585484eSchristos struct evrpc *rpc; 988585484eSchristos struct evrpc_hook *hook; 998585484eSchristos struct evrpc_hook_ctx *pause; 1008585484eSchristos int r; 1018585484eSchristos 1028585484eSchristos while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 1038585484eSchristos r = evrpc_unregister_rpc(base, rpc->uri); 1048585484eSchristos EVUTIL_ASSERT(r == 0); 1058585484eSchristos } 1068585484eSchristos while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 1078585484eSchristos TAILQ_REMOVE(&base->paused_requests, pause, next); 1088585484eSchristos mm_free(pause); 1098585484eSchristos } 1108585484eSchristos while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 1118585484eSchristos r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 1128585484eSchristos EVUTIL_ASSERT(r); 1138585484eSchristos } 1148585484eSchristos while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 1158585484eSchristos r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 1168585484eSchristos EVUTIL_ASSERT(r); 1178585484eSchristos } 1188585484eSchristos mm_free(base); 1198585484eSchristos } 1208585484eSchristos 1218585484eSchristos void * 1228585484eSchristos evrpc_add_hook(void *vbase, 1238585484eSchristos enum EVRPC_HOOK_TYPE hook_type, 1248585484eSchristos int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 1258585484eSchristos void *cb_arg) 1268585484eSchristos { 1278585484eSchristos struct evrpc_hooks_ *base = vbase; 1288585484eSchristos struct evrpc_hook_list *head = NULL; 1298585484eSchristos struct evrpc_hook *hook = NULL; 1308585484eSchristos switch (hook_type) { 1318585484eSchristos case EVRPC_INPUT: 1328585484eSchristos head = &base->in_hooks; 1338585484eSchristos break; 1348585484eSchristos case EVRPC_OUTPUT: 1358585484eSchristos head = &base->out_hooks; 1368585484eSchristos break; 1378585484eSchristos default: 1388585484eSchristos EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 1398585484eSchristos } 1408585484eSchristos 1418585484eSchristos hook = mm_calloc(1, sizeof(struct evrpc_hook)); 1428585484eSchristos EVUTIL_ASSERT(hook != NULL); 1438585484eSchristos 1448585484eSchristos hook->process = cb; 1458585484eSchristos hook->process_arg = cb_arg; 1468585484eSchristos TAILQ_INSERT_TAIL(head, hook, next); 1478585484eSchristos 1488585484eSchristos return (hook); 1498585484eSchristos } 1508585484eSchristos 1518585484eSchristos static int 1528585484eSchristos evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 1538585484eSchristos { 1548585484eSchristos struct evrpc_hook *hook = NULL; 1558585484eSchristos TAILQ_FOREACH(hook, head, next) { 1568585484eSchristos if (hook == handle) { 1578585484eSchristos TAILQ_REMOVE(head, hook, next); 1588585484eSchristos mm_free(hook); 1598585484eSchristos return (1); 1608585484eSchristos } 1618585484eSchristos } 1628585484eSchristos 1638585484eSchristos return (0); 1648585484eSchristos } 1658585484eSchristos 1668585484eSchristos /* 1678585484eSchristos * remove the hook specified by the handle 1688585484eSchristos */ 1698585484eSchristos 1708585484eSchristos int 1718585484eSchristos evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 1728585484eSchristos { 1738585484eSchristos struct evrpc_hooks_ *base = vbase; 1748585484eSchristos struct evrpc_hook_list *head = NULL; 1758585484eSchristos switch (hook_type) { 1768585484eSchristos case EVRPC_INPUT: 1778585484eSchristos head = &base->in_hooks; 1788585484eSchristos break; 1798585484eSchristos case EVRPC_OUTPUT: 1808585484eSchristos head = &base->out_hooks; 1818585484eSchristos break; 1828585484eSchristos default: 1838585484eSchristos EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 1848585484eSchristos } 1858585484eSchristos 1868585484eSchristos return (evrpc_remove_hook_internal(head, handle)); 1878585484eSchristos } 1888585484eSchristos 1898585484eSchristos static int 1908585484eSchristos evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 1918585484eSchristos struct evhttp_request *req, struct evbuffer *evbuf) 1928585484eSchristos { 1938585484eSchristos struct evrpc_hook *hook; 1948585484eSchristos TAILQ_FOREACH(hook, head, next) { 1958585484eSchristos int res = hook->process(ctx, req, evbuf, hook->process_arg); 1968585484eSchristos if (res != EVRPC_CONTINUE) 1978585484eSchristos return (res); 1988585484eSchristos } 1998585484eSchristos 2008585484eSchristos return (EVRPC_CONTINUE); 2018585484eSchristos } 2028585484eSchristos 2038585484eSchristos static void evrpc_pool_schedule(struct evrpc_pool *pool); 2048585484eSchristos static void evrpc_request_cb(struct evhttp_request *, void *); 2058585484eSchristos 2068585484eSchristos /* 2078585484eSchristos * Registers a new RPC with the HTTP server. The evrpc object is expected 2088585484eSchristos * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 2098585484eSchristos * calls this function. 2108585484eSchristos */ 2118585484eSchristos 2128585484eSchristos static char * 2138585484eSchristos evrpc_construct_uri(const char *uri) 2148585484eSchristos { 2158585484eSchristos char *constructed_uri; 2168585484eSchristos size_t constructed_uri_len; 2178585484eSchristos 2188585484eSchristos constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 2198585484eSchristos if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 2208585484eSchristos event_err(1, "%s: failed to register rpc at %s", 2218585484eSchristos __func__, uri); 2228585484eSchristos memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 2238585484eSchristos memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 2248585484eSchristos constructed_uri[constructed_uri_len - 1] = '\0'; 2258585484eSchristos 2268585484eSchristos return (constructed_uri); 2278585484eSchristos } 2288585484eSchristos 2298585484eSchristos int 2308585484eSchristos evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 2318585484eSchristos void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 2328585484eSchristos { 2338585484eSchristos char *constructed_uri = evrpc_construct_uri(rpc->uri); 2348585484eSchristos 2358585484eSchristos rpc->base = base; 2368585484eSchristos rpc->cb = cb; 2378585484eSchristos rpc->cb_arg = cb_arg; 2388585484eSchristos 2398585484eSchristos TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 2408585484eSchristos 2418585484eSchristos evhttp_set_cb(base->http_server, 2428585484eSchristos constructed_uri, 2438585484eSchristos evrpc_request_cb, 2448585484eSchristos rpc); 2458585484eSchristos 2468585484eSchristos mm_free(constructed_uri); 2478585484eSchristos 2488585484eSchristos return (0); 2498585484eSchristos } 2508585484eSchristos 2518585484eSchristos int 2528585484eSchristos evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 2538585484eSchristos { 2548585484eSchristos char *registered_uri = NULL; 2558585484eSchristos struct evrpc *rpc; 2568585484eSchristos int r; 2578585484eSchristos 2588585484eSchristos /* find the right rpc; linear search might be slow */ 2598585484eSchristos TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 2608585484eSchristos if (strcmp(rpc->uri, name) == 0) 2618585484eSchristos break; 2628585484eSchristos } 2638585484eSchristos if (rpc == NULL) { 2648585484eSchristos /* We did not find an RPC with this name */ 2658585484eSchristos return (-1); 2668585484eSchristos } 2678585484eSchristos TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 2688585484eSchristos 2698585484eSchristos registered_uri = evrpc_construct_uri(name); 2708585484eSchristos 2718585484eSchristos /* remove the http server callback */ 2728585484eSchristos r = evhttp_del_cb(base->http_server, registered_uri); 2738585484eSchristos EVUTIL_ASSERT(r == 0); 2748585484eSchristos 2758585484eSchristos mm_free(registered_uri); 2768585484eSchristos 2778585484eSchristos mm_free((char *)rpc->uri); 2788585484eSchristos mm_free(rpc); 2798585484eSchristos return (0); 2808585484eSchristos } 2818585484eSchristos 2828585484eSchristos static int evrpc_pause_request(void *vbase, void *ctx, 2838585484eSchristos void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 2848585484eSchristos static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 2858585484eSchristos 2868585484eSchristos static void 2878585484eSchristos evrpc_request_cb(struct evhttp_request *req, void *arg) 2888585484eSchristos { 2898585484eSchristos struct evrpc *rpc = arg; 2908585484eSchristos struct evrpc_req_generic *rpc_state = NULL; 2918585484eSchristos 2928585484eSchristos /* let's verify the outside parameters */ 2938585484eSchristos if (req->type != EVHTTP_REQ_POST || 2948585484eSchristos evbuffer_get_length(req->input_buffer) <= 0) 2958585484eSchristos goto error; 2968585484eSchristos 2978585484eSchristos rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 2988585484eSchristos if (rpc_state == NULL) 2998585484eSchristos goto error; 3008585484eSchristos rpc_state->rpc = rpc; 3018585484eSchristos rpc_state->http_req = req; 3028585484eSchristos rpc_state->rpc_data = NULL; 3038585484eSchristos 3048585484eSchristos if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 3058585484eSchristos int hook_res; 3068585484eSchristos 3078585484eSchristos evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 3088585484eSchristos 3098585484eSchristos /* 3108585484eSchristos * allow hooks to modify the outgoing request 3118585484eSchristos */ 3128585484eSchristos hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 3138585484eSchristos rpc_state, req, req->input_buffer); 3148585484eSchristos switch (hook_res) { 3158585484eSchristos case EVRPC_TERMINATE: 3168585484eSchristos goto error; 3178585484eSchristos case EVRPC_PAUSE: 3188585484eSchristos evrpc_pause_request(rpc->base, rpc_state, 3198585484eSchristos evrpc_request_cb_closure); 3208585484eSchristos return; 3218585484eSchristos case EVRPC_CONTINUE: 3228585484eSchristos break; 3238585484eSchristos default: 3248585484eSchristos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 3258585484eSchristos hook_res == EVRPC_CONTINUE || 3268585484eSchristos hook_res == EVRPC_PAUSE); 3278585484eSchristos } 3288585484eSchristos } 3298585484eSchristos 3308585484eSchristos evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 3318585484eSchristos return; 3328585484eSchristos 3338585484eSchristos error: 334*eabc0478Schristos if (rpc_state) 3358585484eSchristos evrpc_reqstate_free_(rpc_state); 3368585484eSchristos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 3378585484eSchristos return; 3388585484eSchristos } 3398585484eSchristos 3408585484eSchristos static void 3418585484eSchristos evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 3428585484eSchristos { 3438585484eSchristos struct evrpc_req_generic *rpc_state = arg; 3448585484eSchristos struct evrpc *rpc; 3458585484eSchristos struct evhttp_request *req; 3468585484eSchristos 3478585484eSchristos EVUTIL_ASSERT(rpc_state); 3488585484eSchristos rpc = rpc_state->rpc; 3498585484eSchristos req = rpc_state->http_req; 3508585484eSchristos 3518585484eSchristos if (hook_res == EVRPC_TERMINATE) 3528585484eSchristos goto error; 3538585484eSchristos 3548585484eSchristos /* let's check that we can parse the request */ 3558585484eSchristos rpc_state->request = rpc->request_new(rpc->request_new_arg); 3568585484eSchristos if (rpc_state->request == NULL) 3578585484eSchristos goto error; 3588585484eSchristos 3598585484eSchristos if (rpc->request_unmarshal( 3608585484eSchristos rpc_state->request, req->input_buffer) == -1) { 3618585484eSchristos /* we failed to parse the request; that's a bummer */ 3628585484eSchristos goto error; 3638585484eSchristos } 3648585484eSchristos 3658585484eSchristos /* at this point, we have a well formed request, prepare the reply */ 3668585484eSchristos 3678585484eSchristos rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 3688585484eSchristos if (rpc_state->reply == NULL) 3698585484eSchristos goto error; 3708585484eSchristos 3718585484eSchristos /* give the rpc to the user; they can deal with it */ 3728585484eSchristos rpc->cb(rpc_state, rpc->cb_arg); 3738585484eSchristos 3748585484eSchristos return; 3758585484eSchristos 3768585484eSchristos error: 3778585484eSchristos evrpc_reqstate_free_(rpc_state); 3788585484eSchristos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 3798585484eSchristos return; 3808585484eSchristos } 3818585484eSchristos 3828585484eSchristos 3838585484eSchristos void 3848585484eSchristos evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 3858585484eSchristos { 3868585484eSchristos struct evrpc *rpc; 3878585484eSchristos EVUTIL_ASSERT(rpc_state != NULL); 3888585484eSchristos rpc = rpc_state->rpc; 3898585484eSchristos 3908585484eSchristos /* clean up all memory */ 3918585484eSchristos if (rpc_state->hook_meta != NULL) 3928585484eSchristos evrpc_hook_context_free_(rpc_state->hook_meta); 3938585484eSchristos if (rpc_state->request != NULL) 3948585484eSchristos rpc->request_free(rpc_state->request); 3958585484eSchristos if (rpc_state->reply != NULL) 3968585484eSchristos rpc->reply_free(rpc_state->reply); 3978585484eSchristos if (rpc_state->rpc_data != NULL) 3988585484eSchristos evbuffer_free(rpc_state->rpc_data); 3998585484eSchristos mm_free(rpc_state); 4008585484eSchristos } 4018585484eSchristos 4028585484eSchristos static void 4038585484eSchristos evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 4048585484eSchristos 4058585484eSchristos void 4068585484eSchristos evrpc_request_done(struct evrpc_req_generic *rpc_state) 4078585484eSchristos { 4088585484eSchristos struct evhttp_request *req; 4098585484eSchristos struct evrpc *rpc; 4108585484eSchristos 4118585484eSchristos EVUTIL_ASSERT(rpc_state); 4128585484eSchristos 4138585484eSchristos req = rpc_state->http_req; 4148585484eSchristos rpc = rpc_state->rpc; 4158585484eSchristos 4168585484eSchristos if (rpc->reply_complete(rpc_state->reply) == -1) { 4178585484eSchristos /* the reply was not completely filled in. error out */ 4188585484eSchristos goto error; 4198585484eSchristos } 4208585484eSchristos 4218585484eSchristos if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 4228585484eSchristos /* out of memory */ 4238585484eSchristos goto error; 4248585484eSchristos } 4258585484eSchristos 4268585484eSchristos /* serialize the reply */ 4278585484eSchristos rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 4288585484eSchristos 4298585484eSchristos if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 4308585484eSchristos int hook_res; 4318585484eSchristos 4328585484eSchristos evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 4338585484eSchristos 4348585484eSchristos /* do hook based tweaks to the request */ 4358585484eSchristos hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 4368585484eSchristos rpc_state, req, rpc_state->rpc_data); 4378585484eSchristos switch (hook_res) { 4388585484eSchristos case EVRPC_TERMINATE: 4398585484eSchristos goto error; 4408585484eSchristos case EVRPC_PAUSE: 4418585484eSchristos if (evrpc_pause_request(rpc->base, rpc_state, 4428585484eSchristos evrpc_request_done_closure) == -1) 4438585484eSchristos goto error; 4448585484eSchristos return; 4458585484eSchristos case EVRPC_CONTINUE: 4468585484eSchristos break; 4478585484eSchristos default: 4488585484eSchristos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 4498585484eSchristos hook_res == EVRPC_CONTINUE || 4508585484eSchristos hook_res == EVRPC_PAUSE); 4518585484eSchristos } 4528585484eSchristos } 4538585484eSchristos 4548585484eSchristos evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 4558585484eSchristos return; 4568585484eSchristos 4578585484eSchristos error: 4588585484eSchristos evrpc_reqstate_free_(rpc_state); 4598585484eSchristos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 4608585484eSchristos return; 4618585484eSchristos } 4628585484eSchristos 4638585484eSchristos void * 4648585484eSchristos evrpc_get_request(struct evrpc_req_generic *req) 4658585484eSchristos { 4668585484eSchristos return req->request; 4678585484eSchristos } 4688585484eSchristos 4698585484eSchristos void * 4708585484eSchristos evrpc_get_reply(struct evrpc_req_generic *req) 4718585484eSchristos { 4728585484eSchristos return req->reply; 4738585484eSchristos } 4748585484eSchristos 4758585484eSchristos static void 4768585484eSchristos evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 4778585484eSchristos { 4788585484eSchristos struct evrpc_req_generic *rpc_state = arg; 4798585484eSchristos struct evhttp_request *req; 4808585484eSchristos EVUTIL_ASSERT(rpc_state); 4818585484eSchristos req = rpc_state->http_req; 4828585484eSchristos 4838585484eSchristos if (hook_res == EVRPC_TERMINATE) 4848585484eSchristos goto error; 4858585484eSchristos 4868585484eSchristos /* on success, we are going to transmit marshaled binary data */ 4878585484eSchristos if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 4888585484eSchristos evhttp_add_header(req->output_headers, 4898585484eSchristos "Content-Type", "application/octet-stream"); 4908585484eSchristos } 4918585484eSchristos evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 4928585484eSchristos 4938585484eSchristos evrpc_reqstate_free_(rpc_state); 4948585484eSchristos 4958585484eSchristos return; 4968585484eSchristos 4978585484eSchristos error: 4988585484eSchristos evrpc_reqstate_free_(rpc_state); 4998585484eSchristos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 5008585484eSchristos return; 5018585484eSchristos } 5028585484eSchristos 5038585484eSchristos 5048585484eSchristos /* Client implementation of RPC site */ 5058585484eSchristos 5068585484eSchristos static int evrpc_schedule_request(struct evhttp_connection *connection, 5078585484eSchristos struct evrpc_request_wrapper *ctx); 5088585484eSchristos 5098585484eSchristos struct evrpc_pool * 5108585484eSchristos evrpc_pool_new(struct event_base *base) 5118585484eSchristos { 5128585484eSchristos struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 5138585484eSchristos if (pool == NULL) 5148585484eSchristos return (NULL); 5158585484eSchristos 5168585484eSchristos TAILQ_INIT(&pool->connections); 5178585484eSchristos TAILQ_INIT(&pool->requests); 5188585484eSchristos 5198585484eSchristos TAILQ_INIT(&pool->paused_requests); 5208585484eSchristos 5218585484eSchristos TAILQ_INIT(&pool->input_hooks); 5228585484eSchristos TAILQ_INIT(&pool->output_hooks); 5238585484eSchristos 5248585484eSchristos pool->base = base; 5258585484eSchristos pool->timeout = -1; 5268585484eSchristos 5278585484eSchristos return (pool); 5288585484eSchristos } 5298585484eSchristos 5308585484eSchristos static void 5318585484eSchristos evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 5328585484eSchristos { 5338585484eSchristos if (request->hook_meta != NULL) 5348585484eSchristos evrpc_hook_context_free_(request->hook_meta); 5358585484eSchristos mm_free(request->name); 5368585484eSchristos mm_free(request); 5378585484eSchristos } 5388585484eSchristos 5398585484eSchristos void 5408585484eSchristos evrpc_pool_free(struct evrpc_pool *pool) 5418585484eSchristos { 5428585484eSchristos struct evhttp_connection *connection; 5438585484eSchristos struct evrpc_request_wrapper *request; 5448585484eSchristos struct evrpc_hook_ctx *pause; 5458585484eSchristos struct evrpc_hook *hook; 5468585484eSchristos int r; 5478585484eSchristos 5488585484eSchristos while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 5498585484eSchristos TAILQ_REMOVE(&pool->requests, request, next); 5508585484eSchristos evrpc_request_wrapper_free(request); 5518585484eSchristos } 5528585484eSchristos 5538585484eSchristos while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 5548585484eSchristos TAILQ_REMOVE(&pool->paused_requests, pause, next); 5558585484eSchristos mm_free(pause); 5568585484eSchristos } 5578585484eSchristos 5588585484eSchristos while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 5598585484eSchristos TAILQ_REMOVE(&pool->connections, connection, next); 5608585484eSchristos evhttp_connection_free(connection); 5618585484eSchristos } 5628585484eSchristos 5638585484eSchristos while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 5648585484eSchristos r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 5658585484eSchristos EVUTIL_ASSERT(r); 5668585484eSchristos } 5678585484eSchristos 5688585484eSchristos while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 5698585484eSchristos r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 5708585484eSchristos EVUTIL_ASSERT(r); 5718585484eSchristos } 5728585484eSchristos 5738585484eSchristos mm_free(pool); 5748585484eSchristos } 5758585484eSchristos 5768585484eSchristos /* 5778585484eSchristos * Add a connection to the RPC pool. A request scheduled on the pool 5788585484eSchristos * may use any available connection. 5798585484eSchristos */ 5808585484eSchristos 5818585484eSchristos void 5828585484eSchristos evrpc_pool_add_connection(struct evrpc_pool *pool, 5838585484eSchristos struct evhttp_connection *connection) 5848585484eSchristos { 5858585484eSchristos EVUTIL_ASSERT(connection->http_server == NULL); 5868585484eSchristos TAILQ_INSERT_TAIL(&pool->connections, connection, next); 5878585484eSchristos 5888585484eSchristos /* 5898585484eSchristos * associate an event base with this connection 5908585484eSchristos */ 5918585484eSchristos if (pool->base != NULL) 5928585484eSchristos evhttp_connection_set_base(connection, pool->base); 5938585484eSchristos 5948585484eSchristos /* 5958585484eSchristos * unless a timeout was specifically set for a connection, 5968585484eSchristos * the connection inherits the timeout from the pool. 5978585484eSchristos */ 5988585484eSchristos if (!evutil_timerisset(&connection->timeout)) 5998585484eSchristos evhttp_connection_set_timeout(connection, pool->timeout); 6008585484eSchristos 6018585484eSchristos /* 6028585484eSchristos * if we have any requests pending, schedule them with the new 6038585484eSchristos * connections. 6048585484eSchristos */ 6058585484eSchristos 6068585484eSchristos if (TAILQ_FIRST(&pool->requests) != NULL) { 6078585484eSchristos struct evrpc_request_wrapper *request = 6088585484eSchristos TAILQ_FIRST(&pool->requests); 6098585484eSchristos TAILQ_REMOVE(&pool->requests, request, next); 6108585484eSchristos evrpc_schedule_request(connection, request); 6118585484eSchristos } 6128585484eSchristos } 6138585484eSchristos 6148585484eSchristos void 6158585484eSchristos evrpc_pool_remove_connection(struct evrpc_pool *pool, 6168585484eSchristos struct evhttp_connection *connection) 6178585484eSchristos { 6188585484eSchristos TAILQ_REMOVE(&pool->connections, connection, next); 6198585484eSchristos } 6208585484eSchristos 6218585484eSchristos void 6228585484eSchristos evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 6238585484eSchristos { 6248585484eSchristos struct evhttp_connection *evcon; 6258585484eSchristos TAILQ_FOREACH(evcon, &pool->connections, next) { 6268585484eSchristos evhttp_connection_set_timeout(evcon, timeout_in_secs); 6278585484eSchristos } 6288585484eSchristos pool->timeout = timeout_in_secs; 6298585484eSchristos } 6308585484eSchristos 6318585484eSchristos 6328585484eSchristos static void evrpc_reply_done(struct evhttp_request *, void *); 6338585484eSchristos static void evrpc_request_timeout(evutil_socket_t, short, void *); 6348585484eSchristos 6358585484eSchristos /* 6368585484eSchristos * Finds a connection object associated with the pool that is currently 6378585484eSchristos * idle and can be used to make a request. 6388585484eSchristos */ 6398585484eSchristos static struct evhttp_connection * 6408585484eSchristos evrpc_pool_find_connection(struct evrpc_pool *pool) 6418585484eSchristos { 6428585484eSchristos struct evhttp_connection *connection; 6438585484eSchristos TAILQ_FOREACH(connection, &pool->connections, next) { 6448585484eSchristos if (TAILQ_FIRST(&connection->requests) == NULL) 6458585484eSchristos return (connection); 6468585484eSchristos } 6478585484eSchristos 6488585484eSchristos return (NULL); 6498585484eSchristos } 6508585484eSchristos 6518585484eSchristos /* 6528585484eSchristos * Prototypes responsible for evrpc scheduling and hooking 6538585484eSchristos */ 6548585484eSchristos 6558585484eSchristos static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 6568585484eSchristos 6578585484eSchristos /* 6588585484eSchristos * We assume that the ctx is no longer queued on the pool. 6598585484eSchristos */ 6608585484eSchristos static int 6618585484eSchristos evrpc_schedule_request(struct evhttp_connection *connection, 6628585484eSchristos struct evrpc_request_wrapper *ctx) 6638585484eSchristos { 6648585484eSchristos struct evhttp_request *req = NULL; 6658585484eSchristos struct evrpc_pool *pool = ctx->pool; 6668585484eSchristos struct evrpc_status status; 6678585484eSchristos 6688585484eSchristos if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 6698585484eSchristos goto error; 6708585484eSchristos 6718585484eSchristos /* serialize the request data into the output buffer */ 6728585484eSchristos ctx->request_marshal(req->output_buffer, ctx->request); 6738585484eSchristos 6748585484eSchristos /* we need to know the connection that we might have to abort */ 6758585484eSchristos ctx->evcon = connection; 6768585484eSchristos 6778585484eSchristos /* if we get paused we also need to know the request */ 6788585484eSchristos ctx->req = req; 6798585484eSchristos 6808585484eSchristos if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 6818585484eSchristos int hook_res; 6828585484eSchristos 6838585484eSchristos evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 6848585484eSchristos 6858585484eSchristos /* apply hooks to the outgoing request */ 6868585484eSchristos hook_res = evrpc_process_hooks(&pool->output_hooks, 6878585484eSchristos ctx, req, req->output_buffer); 6888585484eSchristos 6898585484eSchristos switch (hook_res) { 6908585484eSchristos case EVRPC_TERMINATE: 6918585484eSchristos goto error; 6928585484eSchristos case EVRPC_PAUSE: 6938585484eSchristos /* we need to be explicitly resumed */ 6948585484eSchristos if (evrpc_pause_request(pool, ctx, 6958585484eSchristos evrpc_schedule_request_closure) == -1) 6968585484eSchristos goto error; 6978585484eSchristos return (0); 6988585484eSchristos case EVRPC_CONTINUE: 6998585484eSchristos /* we can just continue */ 7008585484eSchristos break; 7018585484eSchristos default: 7028585484eSchristos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 7038585484eSchristos hook_res == EVRPC_CONTINUE || 7048585484eSchristos hook_res == EVRPC_PAUSE); 7058585484eSchristos } 7068585484eSchristos } 7078585484eSchristos 7088585484eSchristos evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 7098585484eSchristos return (0); 7108585484eSchristos 7118585484eSchristos error: 7128585484eSchristos memset(&status, 0, sizeof(status)); 7138585484eSchristos status.error = EVRPC_STATUS_ERR_UNSTARTED; 7148585484eSchristos (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 7158585484eSchristos evrpc_request_wrapper_free(ctx); 7168585484eSchristos return (-1); 7178585484eSchristos } 7188585484eSchristos 7198585484eSchristos static void 7208585484eSchristos evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 7218585484eSchristos { 7228585484eSchristos struct evrpc_request_wrapper *ctx = arg; 7238585484eSchristos struct evhttp_connection *connection = ctx->evcon; 7248585484eSchristos struct evhttp_request *req = ctx->req; 7258585484eSchristos struct evrpc_pool *pool = ctx->pool; 7268585484eSchristos struct evrpc_status status; 7278585484eSchristos char *uri = NULL; 7288585484eSchristos int res = 0; 7298585484eSchristos 7308585484eSchristos if (hook_res == EVRPC_TERMINATE) 7318585484eSchristos goto error; 7328585484eSchristos 7338585484eSchristos uri = evrpc_construct_uri(ctx->name); 7348585484eSchristos if (uri == NULL) 7358585484eSchristos goto error; 7368585484eSchristos 7378585484eSchristos if (pool->timeout > 0) { 7388585484eSchristos /* 7398585484eSchristos * a timeout after which the whole rpc is going to be aborted. 7408585484eSchristos */ 7418585484eSchristos struct timeval tv; 7428585484eSchristos evutil_timerclear(&tv); 7438585484eSchristos tv.tv_sec = pool->timeout; 7448585484eSchristos evtimer_add(&ctx->ev_timeout, &tv); 7458585484eSchristos } 7468585484eSchristos 7478585484eSchristos /* start the request over the connection */ 7488585484eSchristos res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 7498585484eSchristos mm_free(uri); 7508585484eSchristos 7518585484eSchristos if (res == -1) 7528585484eSchristos goto error; 7538585484eSchristos 7548585484eSchristos return; 7558585484eSchristos 7568585484eSchristos error: 7578585484eSchristos memset(&status, 0, sizeof(status)); 7588585484eSchristos status.error = EVRPC_STATUS_ERR_UNSTARTED; 7598585484eSchristos (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 7608585484eSchristos evrpc_request_wrapper_free(ctx); 7618585484eSchristos } 7628585484eSchristos 7638585484eSchristos /* we just queue the paused request on the pool under the req object */ 7648585484eSchristos static int 7658585484eSchristos evrpc_pause_request(void *vbase, void *ctx, 7668585484eSchristos void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 7678585484eSchristos { 7688585484eSchristos struct evrpc_hooks_ *base = vbase; 7698585484eSchristos struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 7708585484eSchristos if (pause == NULL) 7718585484eSchristos return (-1); 7728585484eSchristos 7738585484eSchristos pause->ctx = ctx; 7748585484eSchristos pause->cb = cb; 7758585484eSchristos 7768585484eSchristos TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 7778585484eSchristos return (0); 7788585484eSchristos } 7798585484eSchristos 7808585484eSchristos int 7818585484eSchristos evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 7828585484eSchristos { 7838585484eSchristos struct evrpc_hooks_ *base = vbase; 7848585484eSchristos struct evrpc_pause_list *head = &base->pause_requests; 7858585484eSchristos struct evrpc_hook_ctx *pause; 7868585484eSchristos 7878585484eSchristos TAILQ_FOREACH(pause, head, next) { 7888585484eSchristos if (pause->ctx == ctx) 7898585484eSchristos break; 7908585484eSchristos } 7918585484eSchristos 7928585484eSchristos if (pause == NULL) 7938585484eSchristos return (-1); 7948585484eSchristos 7958585484eSchristos (*pause->cb)(pause->ctx, res); 7968585484eSchristos TAILQ_REMOVE(head, pause, next); 7978585484eSchristos mm_free(pause); 7988585484eSchristos return (0); 7998585484eSchristos } 8008585484eSchristos 8018585484eSchristos int 8028585484eSchristos evrpc_make_request(struct evrpc_request_wrapper *ctx) 8038585484eSchristos { 8048585484eSchristos struct evrpc_pool *pool = ctx->pool; 8058585484eSchristos 8068585484eSchristos /* initialize the event structure for this rpc */ 8078585484eSchristos evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 8088585484eSchristos 8098585484eSchristos /* we better have some available connections on the pool */ 8108585484eSchristos EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 8118585484eSchristos 8128585484eSchristos /* 8138585484eSchristos * if no connection is available, we queue the request on the pool, 8148585484eSchristos * the next time a connection is empty, the rpc will be send on that. 8158585484eSchristos */ 8168585484eSchristos TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 8178585484eSchristos 8188585484eSchristos evrpc_pool_schedule(pool); 8198585484eSchristos 8208585484eSchristos return (0); 8218585484eSchristos } 8228585484eSchristos 8238585484eSchristos 8248585484eSchristos struct evrpc_request_wrapper * 8258585484eSchristos evrpc_make_request_ctx( 8268585484eSchristos struct evrpc_pool *pool, void *request, void *reply, 8278585484eSchristos const char *rpcname, 8288585484eSchristos void (*req_marshal)(struct evbuffer*, void *), 8298585484eSchristos void (*rpl_clear)(void *), 8308585484eSchristos int (*rpl_unmarshal)(void *, struct evbuffer *), 8318585484eSchristos void (*cb)(struct evrpc_status *, void *, void *, void *), 8328585484eSchristos void *cbarg) 8338585484eSchristos { 8348585484eSchristos struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 8358585484eSchristos mm_malloc(sizeof(struct evrpc_request_wrapper)); 8368585484eSchristos if (ctx == NULL) 8378585484eSchristos return (NULL); 8388585484eSchristos 8398585484eSchristos ctx->pool = pool; 8408585484eSchristos ctx->hook_meta = NULL; 8418585484eSchristos ctx->evcon = NULL; 8428585484eSchristos ctx->name = mm_strdup(rpcname); 8438585484eSchristos if (ctx->name == NULL) { 8448585484eSchristos mm_free(ctx); 8458585484eSchristos return (NULL); 8468585484eSchristos } 8478585484eSchristos ctx->cb = cb; 8488585484eSchristos ctx->cb_arg = cbarg; 8498585484eSchristos ctx->request = request; 8508585484eSchristos ctx->reply = reply; 8518585484eSchristos ctx->request_marshal = req_marshal; 8528585484eSchristos ctx->reply_clear = rpl_clear; 8538585484eSchristos ctx->reply_unmarshal = rpl_unmarshal; 8548585484eSchristos 8558585484eSchristos return (ctx); 8568585484eSchristos } 8578585484eSchristos 8588585484eSchristos static void 8598585484eSchristos evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 8608585484eSchristos 8618585484eSchristos static void 8628585484eSchristos evrpc_reply_done(struct evhttp_request *req, void *arg) 8638585484eSchristos { 8648585484eSchristos struct evrpc_request_wrapper *ctx = arg; 8658585484eSchristos struct evrpc_pool *pool = ctx->pool; 8668585484eSchristos int hook_res = EVRPC_CONTINUE; 8678585484eSchristos 8688585484eSchristos /* cancel any timeout we might have scheduled */ 8698585484eSchristos event_del(&ctx->ev_timeout); 8708585484eSchristos 8718585484eSchristos ctx->req = req; 8728585484eSchristos 8738585484eSchristos /* we need to get the reply now */ 8748585484eSchristos if (req == NULL) { 8758585484eSchristos evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 8768585484eSchristos return; 8778585484eSchristos } 8788585484eSchristos 8798585484eSchristos if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 8808585484eSchristos evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 8818585484eSchristos 8828585484eSchristos /* apply hooks to the incoming request */ 8838585484eSchristos hook_res = evrpc_process_hooks(&pool->input_hooks, 8848585484eSchristos ctx, req, req->input_buffer); 8858585484eSchristos 8868585484eSchristos switch (hook_res) { 8878585484eSchristos case EVRPC_TERMINATE: 8888585484eSchristos case EVRPC_CONTINUE: 8898585484eSchristos break; 8908585484eSchristos case EVRPC_PAUSE: 8918585484eSchristos /* 8928585484eSchristos * if we get paused we also need to know the 8938585484eSchristos * request. unfortunately, the underlying 8948585484eSchristos * layer is going to free it. we need to 8958585484eSchristos * request ownership explicitly 8968585484eSchristos */ 8978585484eSchristos evhttp_request_own(req); 8988585484eSchristos 8998585484eSchristos evrpc_pause_request(pool, ctx, 9008585484eSchristos evrpc_reply_done_closure); 9018585484eSchristos return; 9028585484eSchristos default: 9038585484eSchristos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 9048585484eSchristos hook_res == EVRPC_CONTINUE || 9058585484eSchristos hook_res == EVRPC_PAUSE); 9068585484eSchristos } 9078585484eSchristos } 9088585484eSchristos 9098585484eSchristos evrpc_reply_done_closure(ctx, hook_res); 9108585484eSchristos 9118585484eSchristos /* http request is being freed by underlying layer */ 9128585484eSchristos } 9138585484eSchristos 9148585484eSchristos static void 9158585484eSchristos evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 9168585484eSchristos { 9178585484eSchristos struct evrpc_request_wrapper *ctx = arg; 9188585484eSchristos struct evhttp_request *req = ctx->req; 9198585484eSchristos struct evrpc_pool *pool = ctx->pool; 9208585484eSchristos struct evrpc_status status; 9218585484eSchristos int res = -1; 9228585484eSchristos 9238585484eSchristos memset(&status, 0, sizeof(status)); 9248585484eSchristos status.http_req = req; 9258585484eSchristos 9268585484eSchristos /* we need to get the reply now */ 9278585484eSchristos if (req == NULL) { 9288585484eSchristos status.error = EVRPC_STATUS_ERR_TIMEOUT; 9298585484eSchristos } else if (hook_res == EVRPC_TERMINATE) { 9308585484eSchristos status.error = EVRPC_STATUS_ERR_HOOKABORTED; 9318585484eSchristos } else { 9328585484eSchristos res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 9338585484eSchristos if (res == -1) 9348585484eSchristos status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 9358585484eSchristos } 9368585484eSchristos 9378585484eSchristos if (res == -1) { 9388585484eSchristos /* clear everything that we might have written previously */ 9398585484eSchristos ctx->reply_clear(ctx->reply); 9408585484eSchristos } 9418585484eSchristos 9428585484eSchristos (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 9438585484eSchristos 9448585484eSchristos evrpc_request_wrapper_free(ctx); 9458585484eSchristos 9468585484eSchristos /* the http layer owned the original request structure, but if we 9478585484eSchristos * got paused, we asked for ownership and need to free it here. */ 9488585484eSchristos if (req != NULL && evhttp_request_is_owned(req)) 9498585484eSchristos evhttp_request_free(req); 9508585484eSchristos 9518585484eSchristos /* see if we can schedule another request */ 9528585484eSchristos evrpc_pool_schedule(pool); 9538585484eSchristos } 9548585484eSchristos 9558585484eSchristos static void 9568585484eSchristos evrpc_pool_schedule(struct evrpc_pool *pool) 9578585484eSchristos { 9588585484eSchristos struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 9598585484eSchristos struct evhttp_connection *evcon; 9608585484eSchristos 9618585484eSchristos /* if no requests are pending, we have no work */ 9628585484eSchristos if (ctx == NULL) 9638585484eSchristos return; 9648585484eSchristos 9658585484eSchristos if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 9668585484eSchristos TAILQ_REMOVE(&pool->requests, ctx, next); 9678585484eSchristos evrpc_schedule_request(evcon, ctx); 9688585484eSchristos } 9698585484eSchristos } 9708585484eSchristos 9718585484eSchristos static void 9728585484eSchristos evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 9738585484eSchristos { 9748585484eSchristos struct evrpc_request_wrapper *ctx = arg; 9758585484eSchristos struct evhttp_connection *evcon = ctx->evcon; 9768585484eSchristos EVUTIL_ASSERT(evcon != NULL); 9778585484eSchristos 978b8ecfcfeSchristos evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 9798585484eSchristos } 9808585484eSchristos 9818585484eSchristos /* 9828585484eSchristos * frees potential meta data associated with a request. 9838585484eSchristos */ 9848585484eSchristos 9858585484eSchristos static void 9868585484eSchristos evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 9878585484eSchristos { 9888585484eSchristos struct evrpc_meta *entry; 9898585484eSchristos EVUTIL_ASSERT(meta_data != NULL); 9908585484eSchristos 9918585484eSchristos while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 9928585484eSchristos TAILQ_REMOVE(meta_data, entry, next); 9938585484eSchristos mm_free(entry->key); 9948585484eSchristos mm_free(entry->data); 9958585484eSchristos mm_free(entry); 9968585484eSchristos } 9978585484eSchristos } 9988585484eSchristos 9998585484eSchristos static struct evrpc_hook_meta * 10008585484eSchristos evrpc_hook_meta_new_(void) 10018585484eSchristos { 10028585484eSchristos struct evrpc_hook_meta *ctx; 10038585484eSchristos ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 10048585484eSchristos EVUTIL_ASSERT(ctx != NULL); 10058585484eSchristos 10068585484eSchristos TAILQ_INIT(&ctx->meta_data); 10078585484eSchristos ctx->evcon = NULL; 10088585484eSchristos 10098585484eSchristos return (ctx); 10108585484eSchristos } 10118585484eSchristos 10128585484eSchristos static void 10138585484eSchristos evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 10148585484eSchristos struct evhttp_connection *evcon) 10158585484eSchristos { 10168585484eSchristos struct evrpc_hook_meta *ctx = *pctx; 10178585484eSchristos if (ctx == NULL) 10188585484eSchristos *pctx = ctx = evrpc_hook_meta_new_(); 10198585484eSchristos ctx->evcon = evcon; 10208585484eSchristos } 10218585484eSchristos 10228585484eSchristos static void 10238585484eSchristos evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 10248585484eSchristos { 10258585484eSchristos evrpc_meta_data_free(&ctx->meta_data); 10268585484eSchristos mm_free(ctx); 10278585484eSchristos } 10288585484eSchristos 10298585484eSchristos /* Adds meta data */ 10308585484eSchristos void 10318585484eSchristos evrpc_hook_add_meta(void *ctx, const char *key, 10328585484eSchristos const void *data, size_t data_size) 10338585484eSchristos { 10348585484eSchristos struct evrpc_request_wrapper *req = ctx; 10358585484eSchristos struct evrpc_hook_meta *store = NULL; 10368585484eSchristos struct evrpc_meta *meta = NULL; 10378585484eSchristos 10388585484eSchristos if ((store = req->hook_meta) == NULL) 10398585484eSchristos store = req->hook_meta = evrpc_hook_meta_new_(); 10408585484eSchristos 10418585484eSchristos meta = mm_malloc(sizeof(struct evrpc_meta)); 10428585484eSchristos EVUTIL_ASSERT(meta != NULL); 10438585484eSchristos meta->key = mm_strdup(key); 10448585484eSchristos EVUTIL_ASSERT(meta->key != NULL); 10458585484eSchristos meta->data_size = data_size; 10468585484eSchristos meta->data = mm_malloc(data_size); 10478585484eSchristos EVUTIL_ASSERT(meta->data != NULL); 10488585484eSchristos memcpy(meta->data, data, data_size); 10498585484eSchristos 10508585484eSchristos TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 10518585484eSchristos } 10528585484eSchristos 10538585484eSchristos int 10548585484eSchristos evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 10558585484eSchristos { 10568585484eSchristos struct evrpc_request_wrapper *req = ctx; 10578585484eSchristos struct evrpc_meta *meta = NULL; 10588585484eSchristos 10598585484eSchristos if (req->hook_meta == NULL) 10608585484eSchristos return (-1); 10618585484eSchristos 10628585484eSchristos TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 10638585484eSchristos if (strcmp(meta->key, key) == 0) { 10648585484eSchristos *data = meta->data; 10658585484eSchristos *data_size = meta->data_size; 10668585484eSchristos return (0); 10678585484eSchristos } 10688585484eSchristos } 10698585484eSchristos 10708585484eSchristos return (-1); 10718585484eSchristos } 10728585484eSchristos 10738585484eSchristos struct evhttp_connection * 10748585484eSchristos evrpc_hook_get_connection(void *ctx) 10758585484eSchristos { 10768585484eSchristos struct evrpc_request_wrapper *req = ctx; 10778585484eSchristos return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 10788585484eSchristos } 10798585484eSchristos 10808585484eSchristos int 10818585484eSchristos evrpc_send_request_generic(struct evrpc_pool *pool, 10828585484eSchristos void *request, void *reply, 10838585484eSchristos void (*cb)(struct evrpc_status *, void *, void *, void *), 10848585484eSchristos void *cb_arg, 10858585484eSchristos const char *rpcname, 10868585484eSchristos void (*req_marshal)(struct evbuffer *, void *), 10878585484eSchristos void (*rpl_clear)(void *), 10888585484eSchristos int (*rpl_unmarshal)(void *, struct evbuffer *)) 10898585484eSchristos { 10908585484eSchristos struct evrpc_status status; 10918585484eSchristos struct evrpc_request_wrapper *ctx; 10928585484eSchristos ctx = evrpc_make_request_ctx(pool, request, reply, 10938585484eSchristos rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 10948585484eSchristos if (ctx == NULL) 10958585484eSchristos goto error; 10968585484eSchristos return (evrpc_make_request(ctx)); 10978585484eSchristos error: 10988585484eSchristos memset(&status, 0, sizeof(status)); 10998585484eSchristos status.error = EVRPC_STATUS_ERR_UNSTARTED; 11008585484eSchristos (*(cb))(&status, request, reply, cb_arg); 11018585484eSchristos return (-1); 11028585484eSchristos } 11038585484eSchristos 11048585484eSchristos /** Takes a request object and fills it in with the right magic */ 11058585484eSchristos static struct evrpc * 11068585484eSchristos evrpc_register_object(const char *name, 11078585484eSchristos void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 11088585484eSchristos int (*req_unmarshal)(void *, struct evbuffer *), 11098585484eSchristos void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 11108585484eSchristos int (*rpl_complete)(void *), 11118585484eSchristos void (*rpl_marshal)(struct evbuffer *, void *)) 11128585484eSchristos { 11138585484eSchristos struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 11148585484eSchristos if (rpc == NULL) 11158585484eSchristos return (NULL); 11168585484eSchristos rpc->uri = mm_strdup(name); 11178585484eSchristos if (rpc->uri == NULL) { 11188585484eSchristos mm_free(rpc); 11198585484eSchristos return (NULL); 11208585484eSchristos } 11218585484eSchristos rpc->request_new = req_new; 11228585484eSchristos rpc->request_new_arg = req_new_arg; 11238585484eSchristos rpc->request_free = req_free; 11248585484eSchristos rpc->request_unmarshal = req_unmarshal; 11258585484eSchristos rpc->reply_new = rpl_new; 11268585484eSchristos rpc->reply_new_arg = rpl_new_arg; 11278585484eSchristos rpc->reply_free = rpl_free; 11288585484eSchristos rpc->reply_complete = rpl_complete; 11298585484eSchristos rpc->reply_marshal = rpl_marshal; 11308585484eSchristos return (rpc); 11318585484eSchristos } 11328585484eSchristos 11338585484eSchristos int 11348585484eSchristos evrpc_register_generic(struct evrpc_base *base, const char *name, 11358585484eSchristos void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 11368585484eSchristos void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 11378585484eSchristos int (*req_unmarshal)(void *, struct evbuffer *), 11388585484eSchristos void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 11398585484eSchristos int (*rpl_complete)(void *), 11408585484eSchristos void (*rpl_marshal)(struct evbuffer *, void *)) 11418585484eSchristos { 11428585484eSchristos struct evrpc* rpc = 11438585484eSchristos evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 11448585484eSchristos rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 11458585484eSchristos if (rpc == NULL) 11468585484eSchristos return (-1); 11478585484eSchristos evrpc_register_rpc(base, rpc, 11488585484eSchristos (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 11498585484eSchristos return (0); 11508585484eSchristos } 11518585484eSchristos 11528585484eSchristos /** accessors for obscure and undocumented functionality */ 11538585484eSchristos struct evrpc_pool * 11548585484eSchristos evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 11558585484eSchristos { 11568585484eSchristos return (ctx->pool); 11578585484eSchristos } 11588585484eSchristos 11598585484eSchristos void 11608585484eSchristos evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 11618585484eSchristos struct evrpc_pool *pool) 11628585484eSchristos { 11638585484eSchristos ctx->pool = pool; 11648585484eSchristos } 11658585484eSchristos 11668585484eSchristos void 11678585484eSchristos evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 11688585484eSchristos void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 11698585484eSchristos void *cb_arg) 11708585484eSchristos { 11718585484eSchristos ctx->cb = cb; 11728585484eSchristos ctx->cb_arg = cb_arg; 11738585484eSchristos } 1174