xref: /netbsd-src/external/bsd/ntp/dist/sntp/libevent/evrpc.c (revision eabc0478de71e4e011a5b4e0392741e01d491794)
1*eabc0478Schristos /*	$NetBSD: evrpc.c,v 1.6 2024/08/18 20:47:21 christos Exp $	*/
28585484eSchristos 
38585484eSchristos /*
48585484eSchristos  * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
58585484eSchristos  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
68585484eSchristos  *
78585484eSchristos  * Redistribution and use in source and binary forms, with or without
88585484eSchristos  * modification, are permitted provided that the following conditions
98585484eSchristos  * are met:
108585484eSchristos  * 1. Redistributions of source code must retain the above copyright
118585484eSchristos  *    notice, this list of conditions and the following disclaimer.
128585484eSchristos  * 2. Redistributions in binary form must reproduce the above copyright
138585484eSchristos  *    notice, this list of conditions and the following disclaimer in the
148585484eSchristos  *    documentation and/or other materials provided with the distribution.
158585484eSchristos  * 3. The name of the author may not be used to endorse or promote products
168585484eSchristos  *    derived from this software without specific prior written permission.
178585484eSchristos  *
188585484eSchristos  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
198585484eSchristos  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
208585484eSchristos  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
218585484eSchristos  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
228585484eSchristos  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
238585484eSchristos  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
248585484eSchristos  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
258585484eSchristos  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
268585484eSchristos  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
278585484eSchristos  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
288585484eSchristos  */
298585484eSchristos #include "event2/event-config.h"
308585484eSchristos #include "evconfig-private.h"
318585484eSchristos 
328585484eSchristos #ifdef _WIN32
338585484eSchristos #define WIN32_LEAN_AND_MEAN
348585484eSchristos #include <winsock2.h>
358585484eSchristos #include <windows.h>
368585484eSchristos #undef WIN32_LEAN_AND_MEAN
378585484eSchristos #endif
388585484eSchristos 
398585484eSchristos #include <sys/types.h>
408585484eSchristos #ifndef _WIN32
418585484eSchristos #include <sys/socket.h>
428585484eSchristos #endif
438585484eSchristos #ifdef EVENT__HAVE_SYS_TIME_H
448585484eSchristos #include <sys/time.h>
458585484eSchristos #endif
468585484eSchristos #include <sys/queue.h>
478585484eSchristos #include <stdio.h>
488585484eSchristos #include <stdlib.h>
498585484eSchristos #ifndef _WIN32
508585484eSchristos #include <unistd.h>
518585484eSchristos #endif
528585484eSchristos #include <errno.h>
538585484eSchristos #include <signal.h>
548585484eSchristos #include <string.h>
558585484eSchristos 
568585484eSchristos #include <sys/queue.h>
578585484eSchristos 
588585484eSchristos #include "event2/event.h"
598585484eSchristos #include "event2/event_struct.h"
608585484eSchristos #include "event2/rpc.h"
618585484eSchristos #include "event2/rpc_struct.h"
628585484eSchristos #include "evrpc-internal.h"
638585484eSchristos #include "event2/http.h"
648585484eSchristos #include "event2/buffer.h"
658585484eSchristos #include "event2/tag.h"
668585484eSchristos #include "event2/http_struct.h"
678585484eSchristos #include "event2/http_compat.h"
688585484eSchristos #include "event2/util.h"
698585484eSchristos #include "util-internal.h"
708585484eSchristos #include "log-internal.h"
718585484eSchristos #include "mm-internal.h"
728585484eSchristos 
738585484eSchristos struct evrpc_base *
748585484eSchristos evrpc_init(struct evhttp *http_server)
758585484eSchristos {
768585484eSchristos 	struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
778585484eSchristos 	if (base == NULL)
788585484eSchristos 		return (NULL);
798585484eSchristos 
808585484eSchristos 	/* we rely on the tagging sub system */
818585484eSchristos 	evtag_init();
828585484eSchristos 
838585484eSchristos 	TAILQ_INIT(&base->registered_rpcs);
848585484eSchristos 	TAILQ_INIT(&base->input_hooks);
858585484eSchristos 	TAILQ_INIT(&base->output_hooks);
868585484eSchristos 
878585484eSchristos 	TAILQ_INIT(&base->paused_requests);
888585484eSchristos 
898585484eSchristos 	base->http_server = http_server;
908585484eSchristos 
918585484eSchristos 	return (base);
928585484eSchristos }
938585484eSchristos 
948585484eSchristos void
958585484eSchristos evrpc_free(struct evrpc_base *base)
968585484eSchristos {
978585484eSchristos 	struct evrpc *rpc;
988585484eSchristos 	struct evrpc_hook *hook;
998585484eSchristos 	struct evrpc_hook_ctx *pause;
1008585484eSchristos 	int r;
1018585484eSchristos 
1028585484eSchristos 	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
1038585484eSchristos 		r = evrpc_unregister_rpc(base, rpc->uri);
1048585484eSchristos 		EVUTIL_ASSERT(r == 0);
1058585484eSchristos 	}
1068585484eSchristos 	while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
1078585484eSchristos 		TAILQ_REMOVE(&base->paused_requests, pause, next);
1088585484eSchristos 		mm_free(pause);
1098585484eSchristos 	}
1108585484eSchristos 	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
1118585484eSchristos 		r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
1128585484eSchristos 		EVUTIL_ASSERT(r);
1138585484eSchristos 	}
1148585484eSchristos 	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
1158585484eSchristos 		r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
1168585484eSchristos 		EVUTIL_ASSERT(r);
1178585484eSchristos 	}
1188585484eSchristos 	mm_free(base);
1198585484eSchristos }
1208585484eSchristos 
1218585484eSchristos void *
1228585484eSchristos evrpc_add_hook(void *vbase,
1238585484eSchristos     enum EVRPC_HOOK_TYPE hook_type,
1248585484eSchristos     int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
1258585484eSchristos     void *cb_arg)
1268585484eSchristos {
1278585484eSchristos 	struct evrpc_hooks_ *base = vbase;
1288585484eSchristos 	struct evrpc_hook_list *head = NULL;
1298585484eSchristos 	struct evrpc_hook *hook = NULL;
1308585484eSchristos 	switch (hook_type) {
1318585484eSchristos 	case EVRPC_INPUT:
1328585484eSchristos 		head = &base->in_hooks;
1338585484eSchristos 		break;
1348585484eSchristos 	case EVRPC_OUTPUT:
1358585484eSchristos 		head = &base->out_hooks;
1368585484eSchristos 		break;
1378585484eSchristos 	default:
1388585484eSchristos 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
1398585484eSchristos 	}
1408585484eSchristos 
1418585484eSchristos 	hook = mm_calloc(1, sizeof(struct evrpc_hook));
1428585484eSchristos 	EVUTIL_ASSERT(hook != NULL);
1438585484eSchristos 
1448585484eSchristos 	hook->process = cb;
1458585484eSchristos 	hook->process_arg = cb_arg;
1468585484eSchristos 	TAILQ_INSERT_TAIL(head, hook, next);
1478585484eSchristos 
1488585484eSchristos 	return (hook);
1498585484eSchristos }
1508585484eSchristos 
1518585484eSchristos static int
1528585484eSchristos evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
1538585484eSchristos {
1548585484eSchristos 	struct evrpc_hook *hook = NULL;
1558585484eSchristos 	TAILQ_FOREACH(hook, head, next) {
1568585484eSchristos 		if (hook == handle) {
1578585484eSchristos 			TAILQ_REMOVE(head, hook, next);
1588585484eSchristos 			mm_free(hook);
1598585484eSchristos 			return (1);
1608585484eSchristos 		}
1618585484eSchristos 	}
1628585484eSchristos 
1638585484eSchristos 	return (0);
1648585484eSchristos }
1658585484eSchristos 
1668585484eSchristos /*
1678585484eSchristos  * remove the hook specified by the handle
1688585484eSchristos  */
1698585484eSchristos 
1708585484eSchristos int
1718585484eSchristos evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
1728585484eSchristos {
1738585484eSchristos 	struct evrpc_hooks_ *base = vbase;
1748585484eSchristos 	struct evrpc_hook_list *head = NULL;
1758585484eSchristos 	switch (hook_type) {
1768585484eSchristos 	case EVRPC_INPUT:
1778585484eSchristos 		head = &base->in_hooks;
1788585484eSchristos 		break;
1798585484eSchristos 	case EVRPC_OUTPUT:
1808585484eSchristos 		head = &base->out_hooks;
1818585484eSchristos 		break;
1828585484eSchristos 	default:
1838585484eSchristos 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
1848585484eSchristos 	}
1858585484eSchristos 
1868585484eSchristos 	return (evrpc_remove_hook_internal(head, handle));
1878585484eSchristos }
1888585484eSchristos 
1898585484eSchristos static int
1908585484eSchristos evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
1918585484eSchristos     struct evhttp_request *req, struct evbuffer *evbuf)
1928585484eSchristos {
1938585484eSchristos 	struct evrpc_hook *hook;
1948585484eSchristos 	TAILQ_FOREACH(hook, head, next) {
1958585484eSchristos 		int res = hook->process(ctx, req, evbuf, hook->process_arg);
1968585484eSchristos 		if (res != EVRPC_CONTINUE)
1978585484eSchristos 			return (res);
1988585484eSchristos 	}
1998585484eSchristos 
2008585484eSchristos 	return (EVRPC_CONTINUE);
2018585484eSchristos }
2028585484eSchristos 
2038585484eSchristos static void evrpc_pool_schedule(struct evrpc_pool *pool);
2048585484eSchristos static void evrpc_request_cb(struct evhttp_request *, void *);
2058585484eSchristos 
2068585484eSchristos /*
2078585484eSchristos  * Registers a new RPC with the HTTP server.   The evrpc object is expected
2088585484eSchristos  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
2098585484eSchristos  * calls this function.
2108585484eSchristos  */
2118585484eSchristos 
2128585484eSchristos static char *
2138585484eSchristos evrpc_construct_uri(const char *uri)
2148585484eSchristos {
2158585484eSchristos 	char *constructed_uri;
2168585484eSchristos 	size_t constructed_uri_len;
2178585484eSchristos 
2188585484eSchristos 	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
2198585484eSchristos 	if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
2208585484eSchristos 		event_err(1, "%s: failed to register rpc at %s",
2218585484eSchristos 		    __func__, uri);
2228585484eSchristos 	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
2238585484eSchristos 	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
2248585484eSchristos 	constructed_uri[constructed_uri_len - 1] = '\0';
2258585484eSchristos 
2268585484eSchristos 	return (constructed_uri);
2278585484eSchristos }
2288585484eSchristos 
2298585484eSchristos int
2308585484eSchristos evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
2318585484eSchristos     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
2328585484eSchristos {
2338585484eSchristos 	char *constructed_uri = evrpc_construct_uri(rpc->uri);
2348585484eSchristos 
2358585484eSchristos 	rpc->base = base;
2368585484eSchristos 	rpc->cb = cb;
2378585484eSchristos 	rpc->cb_arg = cb_arg;
2388585484eSchristos 
2398585484eSchristos 	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
2408585484eSchristos 
2418585484eSchristos 	evhttp_set_cb(base->http_server,
2428585484eSchristos 	    constructed_uri,
2438585484eSchristos 	    evrpc_request_cb,
2448585484eSchristos 	    rpc);
2458585484eSchristos 
2468585484eSchristos 	mm_free(constructed_uri);
2478585484eSchristos 
2488585484eSchristos 	return (0);
2498585484eSchristos }
2508585484eSchristos 
2518585484eSchristos int
2528585484eSchristos evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
2538585484eSchristos {
2548585484eSchristos 	char *registered_uri = NULL;
2558585484eSchristos 	struct evrpc *rpc;
2568585484eSchristos 	int r;
2578585484eSchristos 
2588585484eSchristos 	/* find the right rpc; linear search might be slow */
2598585484eSchristos 	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
2608585484eSchristos 		if (strcmp(rpc->uri, name) == 0)
2618585484eSchristos 			break;
2628585484eSchristos 	}
2638585484eSchristos 	if (rpc == NULL) {
2648585484eSchristos 		/* We did not find an RPC with this name */
2658585484eSchristos 		return (-1);
2668585484eSchristos 	}
2678585484eSchristos 	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
2688585484eSchristos 
2698585484eSchristos 	registered_uri = evrpc_construct_uri(name);
2708585484eSchristos 
2718585484eSchristos 	/* remove the http server callback */
2728585484eSchristos 	r = evhttp_del_cb(base->http_server, registered_uri);
2738585484eSchristos 	EVUTIL_ASSERT(r == 0);
2748585484eSchristos 
2758585484eSchristos 	mm_free(registered_uri);
2768585484eSchristos 
2778585484eSchristos 	mm_free((char *)rpc->uri);
2788585484eSchristos 	mm_free(rpc);
2798585484eSchristos 	return (0);
2808585484eSchristos }
2818585484eSchristos 
2828585484eSchristos static int evrpc_pause_request(void *vbase, void *ctx,
2838585484eSchristos     void (*cb)(void *, enum EVRPC_HOOK_RESULT));
2848585484eSchristos static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
2858585484eSchristos 
2868585484eSchristos static void
2878585484eSchristos evrpc_request_cb(struct evhttp_request *req, void *arg)
2888585484eSchristos {
2898585484eSchristos 	struct evrpc *rpc = arg;
2908585484eSchristos 	struct evrpc_req_generic *rpc_state = NULL;
2918585484eSchristos 
2928585484eSchristos 	/* let's verify the outside parameters */
2938585484eSchristos 	if (req->type != EVHTTP_REQ_POST ||
2948585484eSchristos 	    evbuffer_get_length(req->input_buffer) <= 0)
2958585484eSchristos 		goto error;
2968585484eSchristos 
2978585484eSchristos 	rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
2988585484eSchristos 	if (rpc_state == NULL)
2998585484eSchristos 		goto error;
3008585484eSchristos 	rpc_state->rpc = rpc;
3018585484eSchristos 	rpc_state->http_req = req;
3028585484eSchristos 	rpc_state->rpc_data = NULL;
3038585484eSchristos 
3048585484eSchristos 	if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
3058585484eSchristos 		int hook_res;
3068585484eSchristos 
3078585484eSchristos 		evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
3088585484eSchristos 
3098585484eSchristos 		/*
3108585484eSchristos 		 * allow hooks to modify the outgoing request
3118585484eSchristos 		 */
3128585484eSchristos 		hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
3138585484eSchristos 		    rpc_state, req, req->input_buffer);
3148585484eSchristos 		switch (hook_res) {
3158585484eSchristos 		case EVRPC_TERMINATE:
3168585484eSchristos 			goto error;
3178585484eSchristos 		case EVRPC_PAUSE:
3188585484eSchristos 			evrpc_pause_request(rpc->base, rpc_state,
3198585484eSchristos 			    evrpc_request_cb_closure);
3208585484eSchristos 			return;
3218585484eSchristos 		case EVRPC_CONTINUE:
3228585484eSchristos 			break;
3238585484eSchristos 		default:
3248585484eSchristos 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
3258585484eSchristos 			    hook_res == EVRPC_CONTINUE ||
3268585484eSchristos 			    hook_res == EVRPC_PAUSE);
3278585484eSchristos 		}
3288585484eSchristos 	}
3298585484eSchristos 
3308585484eSchristos 	evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
3318585484eSchristos 	return;
3328585484eSchristos 
3338585484eSchristos error:
334*eabc0478Schristos 	if (rpc_state)
3358585484eSchristos 		evrpc_reqstate_free_(rpc_state);
3368585484eSchristos 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
3378585484eSchristos 	return;
3388585484eSchristos }
3398585484eSchristos 
3408585484eSchristos static void
3418585484eSchristos evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
3428585484eSchristos {
3438585484eSchristos 	struct evrpc_req_generic *rpc_state = arg;
3448585484eSchristos 	struct evrpc *rpc;
3458585484eSchristos 	struct evhttp_request *req;
3468585484eSchristos 
3478585484eSchristos 	EVUTIL_ASSERT(rpc_state);
3488585484eSchristos 	rpc = rpc_state->rpc;
3498585484eSchristos 	req = rpc_state->http_req;
3508585484eSchristos 
3518585484eSchristos 	if (hook_res == EVRPC_TERMINATE)
3528585484eSchristos 		goto error;
3538585484eSchristos 
3548585484eSchristos 	/* let's check that we can parse the request */
3558585484eSchristos 	rpc_state->request = rpc->request_new(rpc->request_new_arg);
3568585484eSchristos 	if (rpc_state->request == NULL)
3578585484eSchristos 		goto error;
3588585484eSchristos 
3598585484eSchristos 	if (rpc->request_unmarshal(
3608585484eSchristos 		    rpc_state->request, req->input_buffer) == -1) {
3618585484eSchristos 		/* we failed to parse the request; that's a bummer */
3628585484eSchristos 		goto error;
3638585484eSchristos 	}
3648585484eSchristos 
3658585484eSchristos 	/* at this point, we have a well formed request, prepare the reply */
3668585484eSchristos 
3678585484eSchristos 	rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
3688585484eSchristos 	if (rpc_state->reply == NULL)
3698585484eSchristos 		goto error;
3708585484eSchristos 
3718585484eSchristos 	/* give the rpc to the user; they can deal with it */
3728585484eSchristos 	rpc->cb(rpc_state, rpc->cb_arg);
3738585484eSchristos 
3748585484eSchristos 	return;
3758585484eSchristos 
3768585484eSchristos error:
3778585484eSchristos 	evrpc_reqstate_free_(rpc_state);
3788585484eSchristos 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
3798585484eSchristos 	return;
3808585484eSchristos }
3818585484eSchristos 
3828585484eSchristos 
3838585484eSchristos void
3848585484eSchristos evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state)
3858585484eSchristos {
3868585484eSchristos 	struct evrpc *rpc;
3878585484eSchristos 	EVUTIL_ASSERT(rpc_state != NULL);
3888585484eSchristos 	rpc = rpc_state->rpc;
3898585484eSchristos 
3908585484eSchristos 	/* clean up all memory */
3918585484eSchristos 	if (rpc_state->hook_meta != NULL)
3928585484eSchristos 		evrpc_hook_context_free_(rpc_state->hook_meta);
3938585484eSchristos 	if (rpc_state->request != NULL)
3948585484eSchristos 		rpc->request_free(rpc_state->request);
3958585484eSchristos 	if (rpc_state->reply != NULL)
3968585484eSchristos 		rpc->reply_free(rpc_state->reply);
3978585484eSchristos 	if (rpc_state->rpc_data != NULL)
3988585484eSchristos 		evbuffer_free(rpc_state->rpc_data);
3998585484eSchristos 	mm_free(rpc_state);
4008585484eSchristos }
4018585484eSchristos 
4028585484eSchristos static void
4038585484eSchristos evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
4048585484eSchristos 
4058585484eSchristos void
4068585484eSchristos evrpc_request_done(struct evrpc_req_generic *rpc_state)
4078585484eSchristos {
4088585484eSchristos 	struct evhttp_request *req;
4098585484eSchristos 	struct evrpc *rpc;
4108585484eSchristos 
4118585484eSchristos 	EVUTIL_ASSERT(rpc_state);
4128585484eSchristos 
4138585484eSchristos 	req = rpc_state->http_req;
4148585484eSchristos 	rpc = rpc_state->rpc;
4158585484eSchristos 
4168585484eSchristos 	if (rpc->reply_complete(rpc_state->reply) == -1) {
4178585484eSchristos 		/* the reply was not completely filled in.  error out */
4188585484eSchristos 		goto error;
4198585484eSchristos 	}
4208585484eSchristos 
4218585484eSchristos 	if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
4228585484eSchristos 		/* out of memory */
4238585484eSchristos 		goto error;
4248585484eSchristos 	}
4258585484eSchristos 
4268585484eSchristos 	/* serialize the reply */
4278585484eSchristos 	rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
4288585484eSchristos 
4298585484eSchristos 	if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
4308585484eSchristos 		int hook_res;
4318585484eSchristos 
4328585484eSchristos 		evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
4338585484eSchristos 
4348585484eSchristos 		/* do hook based tweaks to the request */
4358585484eSchristos 		hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
4368585484eSchristos 		    rpc_state, req, rpc_state->rpc_data);
4378585484eSchristos 		switch (hook_res) {
4388585484eSchristos 		case EVRPC_TERMINATE:
4398585484eSchristos 			goto error;
4408585484eSchristos 		case EVRPC_PAUSE:
4418585484eSchristos 			if (evrpc_pause_request(rpc->base, rpc_state,
4428585484eSchristos 				evrpc_request_done_closure) == -1)
4438585484eSchristos 				goto error;
4448585484eSchristos 			return;
4458585484eSchristos 		case EVRPC_CONTINUE:
4468585484eSchristos 			break;
4478585484eSchristos 		default:
4488585484eSchristos 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
4498585484eSchristos 			    hook_res == EVRPC_CONTINUE ||
4508585484eSchristos 			    hook_res == EVRPC_PAUSE);
4518585484eSchristos 		}
4528585484eSchristos 	}
4538585484eSchristos 
4548585484eSchristos 	evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
4558585484eSchristos 	return;
4568585484eSchristos 
4578585484eSchristos error:
4588585484eSchristos 	evrpc_reqstate_free_(rpc_state);
4598585484eSchristos 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
4608585484eSchristos 	return;
4618585484eSchristos }
4628585484eSchristos 
4638585484eSchristos void *
4648585484eSchristos evrpc_get_request(struct evrpc_req_generic *req)
4658585484eSchristos {
4668585484eSchristos 	return req->request;
4678585484eSchristos }
4688585484eSchristos 
4698585484eSchristos void *
4708585484eSchristos evrpc_get_reply(struct evrpc_req_generic *req)
4718585484eSchristos {
4728585484eSchristos 	return req->reply;
4738585484eSchristos }
4748585484eSchristos 
4758585484eSchristos static void
4768585484eSchristos evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
4778585484eSchristos {
4788585484eSchristos 	struct evrpc_req_generic *rpc_state = arg;
4798585484eSchristos 	struct evhttp_request *req;
4808585484eSchristos 	EVUTIL_ASSERT(rpc_state);
4818585484eSchristos 	req = rpc_state->http_req;
4828585484eSchristos 
4838585484eSchristos 	if (hook_res == EVRPC_TERMINATE)
4848585484eSchristos 		goto error;
4858585484eSchristos 
4868585484eSchristos 	/* on success, we are going to transmit marshaled binary data */
4878585484eSchristos 	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
4888585484eSchristos 		evhttp_add_header(req->output_headers,
4898585484eSchristos 		    "Content-Type", "application/octet-stream");
4908585484eSchristos 	}
4918585484eSchristos 	evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
4928585484eSchristos 
4938585484eSchristos 	evrpc_reqstate_free_(rpc_state);
4948585484eSchristos 
4958585484eSchristos 	return;
4968585484eSchristos 
4978585484eSchristos error:
4988585484eSchristos 	evrpc_reqstate_free_(rpc_state);
4998585484eSchristos 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
5008585484eSchristos 	return;
5018585484eSchristos }
5028585484eSchristos 
5038585484eSchristos 
5048585484eSchristos /* Client implementation of RPC site */
5058585484eSchristos 
5068585484eSchristos static int evrpc_schedule_request(struct evhttp_connection *connection,
5078585484eSchristos     struct evrpc_request_wrapper *ctx);
5088585484eSchristos 
5098585484eSchristos struct evrpc_pool *
5108585484eSchristos evrpc_pool_new(struct event_base *base)
5118585484eSchristos {
5128585484eSchristos 	struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
5138585484eSchristos 	if (pool == NULL)
5148585484eSchristos 		return (NULL);
5158585484eSchristos 
5168585484eSchristos 	TAILQ_INIT(&pool->connections);
5178585484eSchristos 	TAILQ_INIT(&pool->requests);
5188585484eSchristos 
5198585484eSchristos 	TAILQ_INIT(&pool->paused_requests);
5208585484eSchristos 
5218585484eSchristos 	TAILQ_INIT(&pool->input_hooks);
5228585484eSchristos 	TAILQ_INIT(&pool->output_hooks);
5238585484eSchristos 
5248585484eSchristos 	pool->base = base;
5258585484eSchristos 	pool->timeout = -1;
5268585484eSchristos 
5278585484eSchristos 	return (pool);
5288585484eSchristos }
5298585484eSchristos 
5308585484eSchristos static void
5318585484eSchristos evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
5328585484eSchristos {
5338585484eSchristos 	if (request->hook_meta != NULL)
5348585484eSchristos 		evrpc_hook_context_free_(request->hook_meta);
5358585484eSchristos 	mm_free(request->name);
5368585484eSchristos 	mm_free(request);
5378585484eSchristos }
5388585484eSchristos 
5398585484eSchristos void
5408585484eSchristos evrpc_pool_free(struct evrpc_pool *pool)
5418585484eSchristos {
5428585484eSchristos 	struct evhttp_connection *connection;
5438585484eSchristos 	struct evrpc_request_wrapper *request;
5448585484eSchristos 	struct evrpc_hook_ctx *pause;
5458585484eSchristos 	struct evrpc_hook *hook;
5468585484eSchristos 	int r;
5478585484eSchristos 
5488585484eSchristos 	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
5498585484eSchristos 		TAILQ_REMOVE(&pool->requests, request, next);
5508585484eSchristos 		evrpc_request_wrapper_free(request);
5518585484eSchristos 	}
5528585484eSchristos 
5538585484eSchristos 	while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
5548585484eSchristos 		TAILQ_REMOVE(&pool->paused_requests, pause, next);
5558585484eSchristos 		mm_free(pause);
5568585484eSchristos 	}
5578585484eSchristos 
5588585484eSchristos 	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
5598585484eSchristos 		TAILQ_REMOVE(&pool->connections, connection, next);
5608585484eSchristos 		evhttp_connection_free(connection);
5618585484eSchristos 	}
5628585484eSchristos 
5638585484eSchristos 	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
5648585484eSchristos 		r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
5658585484eSchristos 		EVUTIL_ASSERT(r);
5668585484eSchristos 	}
5678585484eSchristos 
5688585484eSchristos 	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
5698585484eSchristos 		r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
5708585484eSchristos 		EVUTIL_ASSERT(r);
5718585484eSchristos 	}
5728585484eSchristos 
5738585484eSchristos 	mm_free(pool);
5748585484eSchristos }
5758585484eSchristos 
5768585484eSchristos /*
5778585484eSchristos  * Add a connection to the RPC pool.   A request scheduled on the pool
5788585484eSchristos  * may use any available connection.
5798585484eSchristos  */
5808585484eSchristos 
5818585484eSchristos void
5828585484eSchristos evrpc_pool_add_connection(struct evrpc_pool *pool,
5838585484eSchristos     struct evhttp_connection *connection)
5848585484eSchristos {
5858585484eSchristos 	EVUTIL_ASSERT(connection->http_server == NULL);
5868585484eSchristos 	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
5878585484eSchristos 
5888585484eSchristos 	/*
5898585484eSchristos 	 * associate an event base with this connection
5908585484eSchristos 	 */
5918585484eSchristos 	if (pool->base != NULL)
5928585484eSchristos 		evhttp_connection_set_base(connection, pool->base);
5938585484eSchristos 
5948585484eSchristos 	/*
5958585484eSchristos 	 * unless a timeout was specifically set for a connection,
5968585484eSchristos 	 * the connection inherits the timeout from the pool.
5978585484eSchristos 	 */
5988585484eSchristos 	if (!evutil_timerisset(&connection->timeout))
5998585484eSchristos 		evhttp_connection_set_timeout(connection, pool->timeout);
6008585484eSchristos 
6018585484eSchristos 	/*
6028585484eSchristos 	 * if we have any requests pending, schedule them with the new
6038585484eSchristos 	 * connections.
6048585484eSchristos 	 */
6058585484eSchristos 
6068585484eSchristos 	if (TAILQ_FIRST(&pool->requests) != NULL) {
6078585484eSchristos 		struct evrpc_request_wrapper *request =
6088585484eSchristos 		    TAILQ_FIRST(&pool->requests);
6098585484eSchristos 		TAILQ_REMOVE(&pool->requests, request, next);
6108585484eSchristos 		evrpc_schedule_request(connection, request);
6118585484eSchristos 	}
6128585484eSchristos }
6138585484eSchristos 
6148585484eSchristos void
6158585484eSchristos evrpc_pool_remove_connection(struct evrpc_pool *pool,
6168585484eSchristos     struct evhttp_connection *connection)
6178585484eSchristos {
6188585484eSchristos 	TAILQ_REMOVE(&pool->connections, connection, next);
6198585484eSchristos }
6208585484eSchristos 
6218585484eSchristos void
6228585484eSchristos evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
6238585484eSchristos {
6248585484eSchristos 	struct evhttp_connection *evcon;
6258585484eSchristos 	TAILQ_FOREACH(evcon, &pool->connections, next) {
6268585484eSchristos 		evhttp_connection_set_timeout(evcon, timeout_in_secs);
6278585484eSchristos 	}
6288585484eSchristos 	pool->timeout = timeout_in_secs;
6298585484eSchristos }
6308585484eSchristos 
6318585484eSchristos 
6328585484eSchristos static void evrpc_reply_done(struct evhttp_request *, void *);
6338585484eSchristos static void evrpc_request_timeout(evutil_socket_t, short, void *);
6348585484eSchristos 
6358585484eSchristos /*
6368585484eSchristos  * Finds a connection object associated with the pool that is currently
6378585484eSchristos  * idle and can be used to make a request.
6388585484eSchristos  */
6398585484eSchristos static struct evhttp_connection *
6408585484eSchristos evrpc_pool_find_connection(struct evrpc_pool *pool)
6418585484eSchristos {
6428585484eSchristos 	struct evhttp_connection *connection;
6438585484eSchristos 	TAILQ_FOREACH(connection, &pool->connections, next) {
6448585484eSchristos 		if (TAILQ_FIRST(&connection->requests) == NULL)
6458585484eSchristos 			return (connection);
6468585484eSchristos 	}
6478585484eSchristos 
6488585484eSchristos 	return (NULL);
6498585484eSchristos }
6508585484eSchristos 
6518585484eSchristos /*
6528585484eSchristos  * Prototypes responsible for evrpc scheduling and hooking
6538585484eSchristos  */
6548585484eSchristos 
6558585484eSchristos static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
6568585484eSchristos 
6578585484eSchristos /*
6588585484eSchristos  * We assume that the ctx is no longer queued on the pool.
6598585484eSchristos  */
6608585484eSchristos static int
6618585484eSchristos evrpc_schedule_request(struct evhttp_connection *connection,
6628585484eSchristos     struct evrpc_request_wrapper *ctx)
6638585484eSchristos {
6648585484eSchristos 	struct evhttp_request *req = NULL;
6658585484eSchristos 	struct evrpc_pool *pool = ctx->pool;
6668585484eSchristos 	struct evrpc_status status;
6678585484eSchristos 
6688585484eSchristos 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
6698585484eSchristos 		goto error;
6708585484eSchristos 
6718585484eSchristos 	/* serialize the request data into the output buffer */
6728585484eSchristos 	ctx->request_marshal(req->output_buffer, ctx->request);
6738585484eSchristos 
6748585484eSchristos 	/* we need to know the connection that we might have to abort */
6758585484eSchristos 	ctx->evcon = connection;
6768585484eSchristos 
6778585484eSchristos 	/* if we get paused we also need to know the request */
6788585484eSchristos 	ctx->req = req;
6798585484eSchristos 
6808585484eSchristos 	if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
6818585484eSchristos 		int hook_res;
6828585484eSchristos 
6838585484eSchristos 		evrpc_hook_associate_meta_(&ctx->hook_meta, connection);
6848585484eSchristos 
6858585484eSchristos 		/* apply hooks to the outgoing request */
6868585484eSchristos 		hook_res = evrpc_process_hooks(&pool->output_hooks,
6878585484eSchristos 		    ctx, req, req->output_buffer);
6888585484eSchristos 
6898585484eSchristos 		switch (hook_res) {
6908585484eSchristos 		case EVRPC_TERMINATE:
6918585484eSchristos 			goto error;
6928585484eSchristos 		case EVRPC_PAUSE:
6938585484eSchristos 			/* we need to be explicitly resumed */
6948585484eSchristos 			if (evrpc_pause_request(pool, ctx,
6958585484eSchristos 				evrpc_schedule_request_closure) == -1)
6968585484eSchristos 				goto error;
6978585484eSchristos 			return (0);
6988585484eSchristos 		case EVRPC_CONTINUE:
6998585484eSchristos 			/* we can just continue */
7008585484eSchristos 			break;
7018585484eSchristos 		default:
7028585484eSchristos 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
7038585484eSchristos 			    hook_res == EVRPC_CONTINUE ||
7048585484eSchristos 			    hook_res == EVRPC_PAUSE);
7058585484eSchristos 		}
7068585484eSchristos 	}
7078585484eSchristos 
7088585484eSchristos 	evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
7098585484eSchristos 	return (0);
7108585484eSchristos 
7118585484eSchristos error:
7128585484eSchristos 	memset(&status, 0, sizeof(status));
7138585484eSchristos 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
7148585484eSchristos 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
7158585484eSchristos 	evrpc_request_wrapper_free(ctx);
7168585484eSchristos 	return (-1);
7178585484eSchristos }
7188585484eSchristos 
7198585484eSchristos static void
7208585484eSchristos evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
7218585484eSchristos {
7228585484eSchristos 	struct evrpc_request_wrapper *ctx = arg;
7238585484eSchristos 	struct evhttp_connection *connection = ctx->evcon;
7248585484eSchristos 	struct evhttp_request *req = ctx->req;
7258585484eSchristos 	struct evrpc_pool *pool = ctx->pool;
7268585484eSchristos 	struct evrpc_status status;
7278585484eSchristos 	char *uri = NULL;
7288585484eSchristos 	int res = 0;
7298585484eSchristos 
7308585484eSchristos 	if (hook_res == EVRPC_TERMINATE)
7318585484eSchristos 		goto error;
7328585484eSchristos 
7338585484eSchristos 	uri = evrpc_construct_uri(ctx->name);
7348585484eSchristos 	if (uri == NULL)
7358585484eSchristos 		goto error;
7368585484eSchristos 
7378585484eSchristos 	if (pool->timeout > 0) {
7388585484eSchristos 		/*
7398585484eSchristos 		 * a timeout after which the whole rpc is going to be aborted.
7408585484eSchristos 		 */
7418585484eSchristos 		struct timeval tv;
7428585484eSchristos 		evutil_timerclear(&tv);
7438585484eSchristos 		tv.tv_sec = pool->timeout;
7448585484eSchristos 		evtimer_add(&ctx->ev_timeout, &tv);
7458585484eSchristos 	}
7468585484eSchristos 
7478585484eSchristos 	/* start the request over the connection */
7488585484eSchristos 	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
7498585484eSchristos 	mm_free(uri);
7508585484eSchristos 
7518585484eSchristos 	if (res == -1)
7528585484eSchristos 		goto error;
7538585484eSchristos 
7548585484eSchristos 	return;
7558585484eSchristos 
7568585484eSchristos error:
7578585484eSchristos 	memset(&status, 0, sizeof(status));
7588585484eSchristos 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
7598585484eSchristos 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
7608585484eSchristos 	evrpc_request_wrapper_free(ctx);
7618585484eSchristos }
7628585484eSchristos 
7638585484eSchristos /* we just queue the paused request on the pool under the req object */
7648585484eSchristos static int
7658585484eSchristos evrpc_pause_request(void *vbase, void *ctx,
7668585484eSchristos     void (*cb)(void *, enum EVRPC_HOOK_RESULT))
7678585484eSchristos {
7688585484eSchristos 	struct evrpc_hooks_ *base = vbase;
7698585484eSchristos 	struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
7708585484eSchristos 	if (pause == NULL)
7718585484eSchristos 		return (-1);
7728585484eSchristos 
7738585484eSchristos 	pause->ctx = ctx;
7748585484eSchristos 	pause->cb = cb;
7758585484eSchristos 
7768585484eSchristos 	TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
7778585484eSchristos 	return (0);
7788585484eSchristos }
7798585484eSchristos 
7808585484eSchristos int
7818585484eSchristos evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
7828585484eSchristos {
7838585484eSchristos 	struct evrpc_hooks_ *base = vbase;
7848585484eSchristos 	struct evrpc_pause_list *head = &base->pause_requests;
7858585484eSchristos 	struct evrpc_hook_ctx *pause;
7868585484eSchristos 
7878585484eSchristos 	TAILQ_FOREACH(pause, head, next) {
7888585484eSchristos 		if (pause->ctx == ctx)
7898585484eSchristos 			break;
7908585484eSchristos 	}
7918585484eSchristos 
7928585484eSchristos 	if (pause == NULL)
7938585484eSchristos 		return (-1);
7948585484eSchristos 
7958585484eSchristos 	(*pause->cb)(pause->ctx, res);
7968585484eSchristos 	TAILQ_REMOVE(head, pause, next);
7978585484eSchristos 	mm_free(pause);
7988585484eSchristos 	return (0);
7998585484eSchristos }
8008585484eSchristos 
8018585484eSchristos int
8028585484eSchristos evrpc_make_request(struct evrpc_request_wrapper *ctx)
8038585484eSchristos {
8048585484eSchristos 	struct evrpc_pool *pool = ctx->pool;
8058585484eSchristos 
8068585484eSchristos 	/* initialize the event structure for this rpc */
8078585484eSchristos 	evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
8088585484eSchristos 
8098585484eSchristos 	/* we better have some available connections on the pool */
8108585484eSchristos 	EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
8118585484eSchristos 
8128585484eSchristos 	/*
8138585484eSchristos 	 * if no connection is available, we queue the request on the pool,
8148585484eSchristos 	 * the next time a connection is empty, the rpc will be send on that.
8158585484eSchristos 	 */
8168585484eSchristos 	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
8178585484eSchristos 
8188585484eSchristos 	evrpc_pool_schedule(pool);
8198585484eSchristos 
8208585484eSchristos 	return (0);
8218585484eSchristos }
8228585484eSchristos 
8238585484eSchristos 
8248585484eSchristos struct evrpc_request_wrapper *
8258585484eSchristos evrpc_make_request_ctx(
8268585484eSchristos 	struct evrpc_pool *pool, void *request, void *reply,
8278585484eSchristos 	const char *rpcname,
8288585484eSchristos 	void (*req_marshal)(struct evbuffer*, void *),
8298585484eSchristos 	void (*rpl_clear)(void *),
8308585484eSchristos 	int (*rpl_unmarshal)(void *, struct evbuffer *),
8318585484eSchristos 	void (*cb)(struct evrpc_status *, void *, void *, void *),
8328585484eSchristos 	void *cbarg)
8338585484eSchristos {
8348585484eSchristos 	struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
8358585484eSchristos 	    mm_malloc(sizeof(struct evrpc_request_wrapper));
8368585484eSchristos 	if (ctx == NULL)
8378585484eSchristos 		return (NULL);
8388585484eSchristos 
8398585484eSchristos 	ctx->pool = pool;
8408585484eSchristos 	ctx->hook_meta = NULL;
8418585484eSchristos 	ctx->evcon = NULL;
8428585484eSchristos 	ctx->name = mm_strdup(rpcname);
8438585484eSchristos 	if (ctx->name == NULL) {
8448585484eSchristos 		mm_free(ctx);
8458585484eSchristos 		return (NULL);
8468585484eSchristos 	}
8478585484eSchristos 	ctx->cb = cb;
8488585484eSchristos 	ctx->cb_arg = cbarg;
8498585484eSchristos 	ctx->request = request;
8508585484eSchristos 	ctx->reply = reply;
8518585484eSchristos 	ctx->request_marshal = req_marshal;
8528585484eSchristos 	ctx->reply_clear = rpl_clear;
8538585484eSchristos 	ctx->reply_unmarshal = rpl_unmarshal;
8548585484eSchristos 
8558585484eSchristos 	return (ctx);
8568585484eSchristos }
8578585484eSchristos 
8588585484eSchristos static void
8598585484eSchristos evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
8608585484eSchristos 
8618585484eSchristos static void
8628585484eSchristos evrpc_reply_done(struct evhttp_request *req, void *arg)
8638585484eSchristos {
8648585484eSchristos 	struct evrpc_request_wrapper *ctx = arg;
8658585484eSchristos 	struct evrpc_pool *pool = ctx->pool;
8668585484eSchristos 	int hook_res = EVRPC_CONTINUE;
8678585484eSchristos 
8688585484eSchristos 	/* cancel any timeout we might have scheduled */
8698585484eSchristos 	event_del(&ctx->ev_timeout);
8708585484eSchristos 
8718585484eSchristos 	ctx->req = req;
8728585484eSchristos 
8738585484eSchristos 	/* we need to get the reply now */
8748585484eSchristos 	if (req == NULL) {
8758585484eSchristos 		evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
8768585484eSchristos 		return;
8778585484eSchristos 	}
8788585484eSchristos 
8798585484eSchristos 	if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
8808585484eSchristos 		evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon);
8818585484eSchristos 
8828585484eSchristos 		/* apply hooks to the incoming request */
8838585484eSchristos 		hook_res = evrpc_process_hooks(&pool->input_hooks,
8848585484eSchristos 		    ctx, req, req->input_buffer);
8858585484eSchristos 
8868585484eSchristos 		switch (hook_res) {
8878585484eSchristos 		case EVRPC_TERMINATE:
8888585484eSchristos 		case EVRPC_CONTINUE:
8898585484eSchristos 			break;
8908585484eSchristos 		case EVRPC_PAUSE:
8918585484eSchristos 			/*
8928585484eSchristos 			 * if we get paused we also need to know the
8938585484eSchristos 			 * request.  unfortunately, the underlying
8948585484eSchristos 			 * layer is going to free it.  we need to
8958585484eSchristos 			 * request ownership explicitly
8968585484eSchristos 			 */
8978585484eSchristos 			evhttp_request_own(req);
8988585484eSchristos 
8998585484eSchristos 			evrpc_pause_request(pool, ctx,
9008585484eSchristos 			    evrpc_reply_done_closure);
9018585484eSchristos 			return;
9028585484eSchristos 		default:
9038585484eSchristos 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
9048585484eSchristos 			    hook_res == EVRPC_CONTINUE ||
9058585484eSchristos 			    hook_res == EVRPC_PAUSE);
9068585484eSchristos 		}
9078585484eSchristos 	}
9088585484eSchristos 
9098585484eSchristos 	evrpc_reply_done_closure(ctx, hook_res);
9108585484eSchristos 
9118585484eSchristos 	/* http request is being freed by underlying layer */
9128585484eSchristos }
9138585484eSchristos 
9148585484eSchristos static void
9158585484eSchristos evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
9168585484eSchristos {
9178585484eSchristos 	struct evrpc_request_wrapper *ctx = arg;
9188585484eSchristos 	struct evhttp_request *req = ctx->req;
9198585484eSchristos 	struct evrpc_pool *pool = ctx->pool;
9208585484eSchristos 	struct evrpc_status status;
9218585484eSchristos 	int res = -1;
9228585484eSchristos 
9238585484eSchristos 	memset(&status, 0, sizeof(status));
9248585484eSchristos 	status.http_req = req;
9258585484eSchristos 
9268585484eSchristos 	/* we need to get the reply now */
9278585484eSchristos 	if (req == NULL) {
9288585484eSchristos 		status.error = EVRPC_STATUS_ERR_TIMEOUT;
9298585484eSchristos 	} else if (hook_res == EVRPC_TERMINATE) {
9308585484eSchristos 		status.error = EVRPC_STATUS_ERR_HOOKABORTED;
9318585484eSchristos 	} else {
9328585484eSchristos 		res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
9338585484eSchristos 		if (res == -1)
9348585484eSchristos 			status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
9358585484eSchristos 	}
9368585484eSchristos 
9378585484eSchristos 	if (res == -1) {
9388585484eSchristos 		/* clear everything that we might have written previously */
9398585484eSchristos 		ctx->reply_clear(ctx->reply);
9408585484eSchristos 	}
9418585484eSchristos 
9428585484eSchristos 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
9438585484eSchristos 
9448585484eSchristos 	evrpc_request_wrapper_free(ctx);
9458585484eSchristos 
9468585484eSchristos 	/* the http layer owned the original request structure, but if we
9478585484eSchristos 	 * got paused, we asked for ownership and need to free it here. */
9488585484eSchristos 	if (req != NULL && evhttp_request_is_owned(req))
9498585484eSchristos 		evhttp_request_free(req);
9508585484eSchristos 
9518585484eSchristos 	/* see if we can schedule another request */
9528585484eSchristos 	evrpc_pool_schedule(pool);
9538585484eSchristos }
9548585484eSchristos 
9558585484eSchristos static void
9568585484eSchristos evrpc_pool_schedule(struct evrpc_pool *pool)
9578585484eSchristos {
9588585484eSchristos 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
9598585484eSchristos 	struct evhttp_connection *evcon;
9608585484eSchristos 
9618585484eSchristos 	/* if no requests are pending, we have no work */
9628585484eSchristos 	if (ctx == NULL)
9638585484eSchristos 		return;
9648585484eSchristos 
9658585484eSchristos 	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
9668585484eSchristos 		TAILQ_REMOVE(&pool->requests, ctx, next);
9678585484eSchristos 		evrpc_schedule_request(evcon, ctx);
9688585484eSchristos 	}
9698585484eSchristos }
9708585484eSchristos 
9718585484eSchristos static void
9728585484eSchristos evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
9738585484eSchristos {
9748585484eSchristos 	struct evrpc_request_wrapper *ctx = arg;
9758585484eSchristos 	struct evhttp_connection *evcon = ctx->evcon;
9768585484eSchristos 	EVUTIL_ASSERT(evcon != NULL);
9778585484eSchristos 
978b8ecfcfeSchristos 	evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT);
9798585484eSchristos }
9808585484eSchristos 
9818585484eSchristos /*
9828585484eSchristos  * frees potential meta data associated with a request.
9838585484eSchristos  */
9848585484eSchristos 
9858585484eSchristos static void
9868585484eSchristos evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
9878585484eSchristos {
9888585484eSchristos 	struct evrpc_meta *entry;
9898585484eSchristos 	EVUTIL_ASSERT(meta_data != NULL);
9908585484eSchristos 
9918585484eSchristos 	while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
9928585484eSchristos 		TAILQ_REMOVE(meta_data, entry, next);
9938585484eSchristos 		mm_free(entry->key);
9948585484eSchristos 		mm_free(entry->data);
9958585484eSchristos 		mm_free(entry);
9968585484eSchristos 	}
9978585484eSchristos }
9988585484eSchristos 
9998585484eSchristos static struct evrpc_hook_meta *
10008585484eSchristos evrpc_hook_meta_new_(void)
10018585484eSchristos {
10028585484eSchristos 	struct evrpc_hook_meta *ctx;
10038585484eSchristos 	ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
10048585484eSchristos 	EVUTIL_ASSERT(ctx != NULL);
10058585484eSchristos 
10068585484eSchristos 	TAILQ_INIT(&ctx->meta_data);
10078585484eSchristos 	ctx->evcon = NULL;
10088585484eSchristos 
10098585484eSchristos 	return (ctx);
10108585484eSchristos }
10118585484eSchristos 
10128585484eSchristos static void
10138585484eSchristos evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx,
10148585484eSchristos     struct evhttp_connection *evcon)
10158585484eSchristos {
10168585484eSchristos 	struct evrpc_hook_meta *ctx = *pctx;
10178585484eSchristos 	if (ctx == NULL)
10188585484eSchristos 		*pctx = ctx = evrpc_hook_meta_new_();
10198585484eSchristos 	ctx->evcon = evcon;
10208585484eSchristos }
10218585484eSchristos 
10228585484eSchristos static void
10238585484eSchristos evrpc_hook_context_free_(struct evrpc_hook_meta *ctx)
10248585484eSchristos {
10258585484eSchristos 	evrpc_meta_data_free(&ctx->meta_data);
10268585484eSchristos 	mm_free(ctx);
10278585484eSchristos }
10288585484eSchristos 
10298585484eSchristos /* Adds meta data */
10308585484eSchristos void
10318585484eSchristos evrpc_hook_add_meta(void *ctx, const char *key,
10328585484eSchristos     const void *data, size_t data_size)
10338585484eSchristos {
10348585484eSchristos 	struct evrpc_request_wrapper *req = ctx;
10358585484eSchristos 	struct evrpc_hook_meta *store = NULL;
10368585484eSchristos 	struct evrpc_meta *meta = NULL;
10378585484eSchristos 
10388585484eSchristos 	if ((store = req->hook_meta) == NULL)
10398585484eSchristos 		store = req->hook_meta = evrpc_hook_meta_new_();
10408585484eSchristos 
10418585484eSchristos 	meta = mm_malloc(sizeof(struct evrpc_meta));
10428585484eSchristos 	EVUTIL_ASSERT(meta != NULL);
10438585484eSchristos 	meta->key = mm_strdup(key);
10448585484eSchristos 	EVUTIL_ASSERT(meta->key != NULL);
10458585484eSchristos 	meta->data_size = data_size;
10468585484eSchristos 	meta->data = mm_malloc(data_size);
10478585484eSchristos 	EVUTIL_ASSERT(meta->data != NULL);
10488585484eSchristos 	memcpy(meta->data, data, data_size);
10498585484eSchristos 
10508585484eSchristos 	TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
10518585484eSchristos }
10528585484eSchristos 
10538585484eSchristos int
10548585484eSchristos evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
10558585484eSchristos {
10568585484eSchristos 	struct evrpc_request_wrapper *req = ctx;
10578585484eSchristos 	struct evrpc_meta *meta = NULL;
10588585484eSchristos 
10598585484eSchristos 	if (req->hook_meta == NULL)
10608585484eSchristos 		return (-1);
10618585484eSchristos 
10628585484eSchristos 	TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
10638585484eSchristos 		if (strcmp(meta->key, key) == 0) {
10648585484eSchristos 			*data = meta->data;
10658585484eSchristos 			*data_size = meta->data_size;
10668585484eSchristos 			return (0);
10678585484eSchristos 		}
10688585484eSchristos 	}
10698585484eSchristos 
10708585484eSchristos 	return (-1);
10718585484eSchristos }
10728585484eSchristos 
10738585484eSchristos struct evhttp_connection *
10748585484eSchristos evrpc_hook_get_connection(void *ctx)
10758585484eSchristos {
10768585484eSchristos 	struct evrpc_request_wrapper *req = ctx;
10778585484eSchristos 	return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
10788585484eSchristos }
10798585484eSchristos 
10808585484eSchristos int
10818585484eSchristos evrpc_send_request_generic(struct evrpc_pool *pool,
10828585484eSchristos     void *request, void *reply,
10838585484eSchristos     void (*cb)(struct evrpc_status *, void *, void *, void *),
10848585484eSchristos     void *cb_arg,
10858585484eSchristos     const char *rpcname,
10868585484eSchristos     void (*req_marshal)(struct evbuffer *, void *),
10878585484eSchristos     void (*rpl_clear)(void *),
10888585484eSchristos     int (*rpl_unmarshal)(void *, struct evbuffer *))
10898585484eSchristos {
10908585484eSchristos 	struct evrpc_status status;
10918585484eSchristos 	struct evrpc_request_wrapper *ctx;
10928585484eSchristos 	ctx = evrpc_make_request_ctx(pool, request, reply,
10938585484eSchristos 	    rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
10948585484eSchristos 	if (ctx == NULL)
10958585484eSchristos 		goto error;
10968585484eSchristos 	return (evrpc_make_request(ctx));
10978585484eSchristos error:
10988585484eSchristos 	memset(&status, 0, sizeof(status));
10998585484eSchristos 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
11008585484eSchristos 	(*(cb))(&status, request, reply, cb_arg);
11018585484eSchristos 	return (-1);
11028585484eSchristos }
11038585484eSchristos 
11048585484eSchristos /** Takes a request object and fills it in with the right magic */
11058585484eSchristos static struct evrpc *
11068585484eSchristos evrpc_register_object(const char *name,
11078585484eSchristos     void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
11088585484eSchristos     int (*req_unmarshal)(void *, struct evbuffer *),
11098585484eSchristos     void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
11108585484eSchristos     int (*rpl_complete)(void *),
11118585484eSchristos     void (*rpl_marshal)(struct evbuffer *, void *))
11128585484eSchristos {
11138585484eSchristos 	struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
11148585484eSchristos 	if (rpc == NULL)
11158585484eSchristos 		return (NULL);
11168585484eSchristos 	rpc->uri = mm_strdup(name);
11178585484eSchristos 	if (rpc->uri == NULL) {
11188585484eSchristos 		mm_free(rpc);
11198585484eSchristos 		return (NULL);
11208585484eSchristos 	}
11218585484eSchristos 	rpc->request_new = req_new;
11228585484eSchristos 	rpc->request_new_arg = req_new_arg;
11238585484eSchristos 	rpc->request_free = req_free;
11248585484eSchristos 	rpc->request_unmarshal = req_unmarshal;
11258585484eSchristos 	rpc->reply_new = rpl_new;
11268585484eSchristos 	rpc->reply_new_arg = rpl_new_arg;
11278585484eSchristos 	rpc->reply_free = rpl_free;
11288585484eSchristos 	rpc->reply_complete = rpl_complete;
11298585484eSchristos 	rpc->reply_marshal = rpl_marshal;
11308585484eSchristos 	return (rpc);
11318585484eSchristos }
11328585484eSchristos 
11338585484eSchristos int
11348585484eSchristos evrpc_register_generic(struct evrpc_base *base, const char *name,
11358585484eSchristos     void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
11368585484eSchristos     void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
11378585484eSchristos     int (*req_unmarshal)(void *, struct evbuffer *),
11388585484eSchristos     void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
11398585484eSchristos     int (*rpl_complete)(void *),
11408585484eSchristos     void (*rpl_marshal)(struct evbuffer *, void *))
11418585484eSchristos {
11428585484eSchristos 	struct evrpc* rpc =
11438585484eSchristos 	    evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
11448585484eSchristos 		rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
11458585484eSchristos 	if (rpc == NULL)
11468585484eSchristos 		return (-1);
11478585484eSchristos 	evrpc_register_rpc(base, rpc,
11488585484eSchristos 	    (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
11498585484eSchristos 	return (0);
11508585484eSchristos }
11518585484eSchristos 
11528585484eSchristos /** accessors for obscure and undocumented functionality */
11538585484eSchristos struct evrpc_pool *
11548585484eSchristos evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
11558585484eSchristos {
11568585484eSchristos 	return (ctx->pool);
11578585484eSchristos }
11588585484eSchristos 
11598585484eSchristos void
11608585484eSchristos evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
11618585484eSchristos     struct evrpc_pool *pool)
11628585484eSchristos {
11638585484eSchristos 	ctx->pool = pool;
11648585484eSchristos }
11658585484eSchristos 
11668585484eSchristos void
11678585484eSchristos evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
11688585484eSchristos     void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
11698585484eSchristos     void *cb_arg)
11708585484eSchristos {
11718585484eSchristos 	ctx->cb = cb;
11728585484eSchristos 	ctx->cb_arg = cb_arg;
11738585484eSchristos }
1174