1 /* $NetBSD: evrpc.c,v 1.3 2015/01/29 07:26:02 spz Exp $ */
2 /*
3 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28 #include "event2/event-config.h"
29 #include <sys/cdefs.h>
30 __RCSID("$NetBSD: evrpc.c,v 1.3 2015/01/29 07:26:02 spz Exp $");
31
32 #ifdef WIN32
33 #define WIN32_LEAN_AND_MEAN
34 #include <winsock2.h>
35 #include <windows.h>
36 #undef WIN32_LEAN_AND_MEAN
37 #endif
38
39 #include <sys/types.h>
40 #ifndef WIN32
41 #include <sys/socket.h>
42 #endif
43 #ifdef _EVENT_HAVE_SYS_TIME_H
44 #include <sys/time.h>
45 #endif
46 #include <sys/queue.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #ifndef WIN32
50 #include <unistd.h>
51 #endif
52 #include <errno.h>
53 #include <signal.h>
54 #include <string.h>
55
56 #include <sys/queue.h>
57
58 #include "event2/event.h"
59 #include "event2/event_struct.h"
60 #include "event2/rpc.h"
61 #include "event2/rpc_struct.h"
62 #include "evrpc-internal.h"
63 #include "event2/http.h"
64 #include "event2/buffer.h"
65 #include "event2/tag.h"
66 #include "event2/http_struct.h"
67 #include "event2/http_compat.h"
68 #include "event2/util.h"
69 #include "util-internal.h"
70 #include "log-internal.h"
71 #include "mm-internal.h"
72
73 struct evrpc_base *
evrpc_init(struct evhttp * http_server)74 evrpc_init(struct evhttp *http_server)
75 {
76 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
77 if (base == NULL)
78 return (NULL);
79
80 /* we rely on the tagging sub system */
81 evtag_init();
82
83 TAILQ_INIT(&base->registered_rpcs);
84 TAILQ_INIT(&base->input_hooks);
85 TAILQ_INIT(&base->output_hooks);
86
87 TAILQ_INIT(&base->paused_requests);
88
89 base->http_server = http_server;
90
91 return (base);
92 }
93
94 void
evrpc_free(struct evrpc_base * base)95 evrpc_free(struct evrpc_base *base)
96 {
97 struct evrpc *rpc;
98 struct evrpc_hook *hook;
99 struct evrpc_hook_ctx *pause;
100 int r;
101
102 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
103 r = evrpc_unregister_rpc(base, rpc->uri);
104 EVUTIL_ASSERT(r == 0);
105 }
106 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
107 TAILQ_REMOVE(&base->paused_requests, pause, next);
108 mm_free(pause);
109 }
110 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
111 r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
112 EVUTIL_ASSERT(r);
113 }
114 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
115 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
116 EVUTIL_ASSERT(r);
117 }
118 mm_free(base);
119 }
120
121 void *
evrpc_add_hook(void * vbase,enum EVRPC_HOOK_TYPE hook_type,int (* cb)(void *,struct evhttp_request *,struct evbuffer *,void *),void * cb_arg)122 evrpc_add_hook(void *vbase,
123 enum EVRPC_HOOK_TYPE hook_type,
124 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
125 void *cb_arg)
126 {
127 struct _evrpc_hooks *base = vbase;
128 struct evrpc_hook_list *head = NULL;
129 struct evrpc_hook *hook = NULL;
130 switch (hook_type) {
131 case EVRPC_INPUT:
132 head = &base->in_hooks;
133 break;
134 case EVRPC_OUTPUT:
135 head = &base->out_hooks;
136 break;
137 default:
138 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
139 }
140
141 hook = mm_calloc(1, sizeof(struct evrpc_hook));
142 EVUTIL_ASSERT(hook != NULL);
143
144 hook->process = cb;
145 hook->process_arg = cb_arg;
146 TAILQ_INSERT_TAIL(head, hook, next);
147
148 return (hook);
149 }
150
151 static int
evrpc_remove_hook_internal(struct evrpc_hook_list * head,void * handle)152 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
153 {
154 struct evrpc_hook *hook = NULL;
155 TAILQ_FOREACH(hook, head, next) {
156 if (hook == handle) {
157 TAILQ_REMOVE(head, hook, next);
158 mm_free(hook);
159 return (1);
160 }
161 }
162
163 return (0);
164 }
165
166 /*
167 * remove the hook specified by the handle
168 */
169
170 int
evrpc_remove_hook(void * vbase,enum EVRPC_HOOK_TYPE hook_type,void * handle)171 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
172 {
173 struct _evrpc_hooks *base = vbase;
174 struct evrpc_hook_list *head = NULL;
175 switch (hook_type) {
176 case EVRPC_INPUT:
177 head = &base->in_hooks;
178 break;
179 case EVRPC_OUTPUT:
180 head = &base->out_hooks;
181 break;
182 default:
183 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
184 }
185
186 return (evrpc_remove_hook_internal(head, handle));
187 }
188
189 static int
evrpc_process_hooks(struct evrpc_hook_list * head,void * ctx,struct evhttp_request * req,struct evbuffer * evbuf)190 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
191 struct evhttp_request *req, struct evbuffer *evbuf)
192 {
193 struct evrpc_hook *hook;
194 TAILQ_FOREACH(hook, head, next) {
195 int res = hook->process(ctx, req, evbuf, hook->process_arg);
196 if (res != EVRPC_CONTINUE)
197 return (res);
198 }
199
200 return (EVRPC_CONTINUE);
201 }
202
203 static void evrpc_pool_schedule(struct evrpc_pool *pool);
204 static void evrpc_request_cb(struct evhttp_request *, void *);
205
206 /*
207 * Registers a new RPC with the HTTP server. The evrpc object is expected
208 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
209 * calls this function.
210 */
211
212 static char *
evrpc_construct_uri(const char * uri)213 evrpc_construct_uri(const char *uri)
214 {
215 char *constructed_uri;
216 size_t constructed_uri_len;
217
218 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
219 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
220 event_err(1, "%s: failed to register rpc at %s",
221 __func__, uri);
222 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
223 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
224 constructed_uri[constructed_uri_len - 1] = '\0';
225
226 return (constructed_uri);
227 }
228
229 int
evrpc_register_rpc(struct evrpc_base * base,struct evrpc * rpc,void (* cb)(struct evrpc_req_generic *,void *),void * cb_arg)230 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
231 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
232 {
233 char *constructed_uri = evrpc_construct_uri(rpc->uri);
234
235 rpc->base = base;
236 rpc->cb = cb;
237 rpc->cb_arg = cb_arg;
238
239 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
240
241 evhttp_set_cb(base->http_server,
242 constructed_uri,
243 evrpc_request_cb,
244 rpc);
245
246 mm_free(constructed_uri);
247
248 return (0);
249 }
250
251 int
evrpc_unregister_rpc(struct evrpc_base * base,const char * name)252 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
253 {
254 char *registered_uri = NULL;
255 struct evrpc *rpc;
256 int r;
257
258 /* find the right rpc; linear search might be slow */
259 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
260 if (strcmp(rpc->uri, name) == 0)
261 break;
262 }
263 if (rpc == NULL) {
264 /* We did not find an RPC with this name */
265 return (-1);
266 }
267 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
268
269 registered_uri = evrpc_construct_uri(name);
270
271 /* remove the http server callback */
272 r = evhttp_del_cb(base->http_server, registered_uri);
273 EVUTIL_ASSERT(r == 0);
274
275 mm_free(registered_uri);
276
277 mm_free(__UNCONST(rpc->uri));
278 mm_free(rpc);
279 return (0);
280 }
281
282 static int evrpc_pause_request(void *vbase, void *ctx,
283 void (*cb)(void *, enum EVRPC_HOOK_RESULT));
284 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
285
286 static void
evrpc_request_cb(struct evhttp_request * req,void * arg)287 evrpc_request_cb(struct evhttp_request *req, void *arg)
288 {
289 struct evrpc *rpc = arg;
290 struct evrpc_req_generic *rpc_state = NULL;
291
292 /* let's verify the outside parameters */
293 if (req->type != EVHTTP_REQ_POST ||
294 evbuffer_get_length(req->input_buffer) <= 0)
295 goto error;
296
297 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
298 if (rpc_state == NULL)
299 goto error;
300 rpc_state->rpc = rpc;
301 rpc_state->http_req = req;
302 rpc_state->rpc_data = NULL;
303
304 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
305 int hook_res;
306
307 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
308
309 /*
310 * allow hooks to modify the outgoing request
311 */
312 hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
313 rpc_state, req, req->input_buffer);
314 switch (hook_res) {
315 case EVRPC_TERMINATE:
316 goto error;
317 case EVRPC_PAUSE:
318 evrpc_pause_request(rpc->base, rpc_state,
319 evrpc_request_cb_closure);
320 return;
321 case EVRPC_CONTINUE:
322 break;
323 default:
324 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
325 hook_res == EVRPC_CONTINUE ||
326 hook_res == EVRPC_PAUSE);
327 }
328 }
329
330 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
331 return;
332
333 error:
334 if (rpc_state != NULL)
335 evrpc_reqstate_free(rpc_state);
336 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
337 return;
338 }
339
340 static void
evrpc_request_cb_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)341 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
342 {
343 struct evrpc_req_generic *rpc_state = arg;
344 struct evrpc *rpc;
345 struct evhttp_request *req;
346
347 EVUTIL_ASSERT(rpc_state);
348 rpc = rpc_state->rpc;
349 req = rpc_state->http_req;
350
351 if (hook_res == EVRPC_TERMINATE)
352 goto error;
353
354 /* let's check that we can parse the request */
355 rpc_state->request = rpc->request_new(rpc->request_new_arg);
356 if (rpc_state->request == NULL)
357 goto error;
358
359 if (rpc->request_unmarshal(
360 rpc_state->request, req->input_buffer) == -1) {
361 /* we failed to parse the request; that's a bummer */
362 goto error;
363 }
364
365 /* at this point, we have a well formed request, prepare the reply */
366
367 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
368 if (rpc_state->reply == NULL)
369 goto error;
370
371 /* give the rpc to the user; they can deal with it */
372 rpc->cb(rpc_state, rpc->cb_arg);
373
374 return;
375
376 error:
377 if (rpc_state != NULL)
378 evrpc_reqstate_free(rpc_state);
379 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
380 return;
381 }
382
383
384 void
evrpc_reqstate_free(struct evrpc_req_generic * rpc_state)385 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
386 {
387 struct evrpc *rpc;
388 EVUTIL_ASSERT(rpc_state != NULL);
389 rpc = rpc_state->rpc;
390
391 /* clean up all memory */
392 if (rpc_state->hook_meta != NULL)
393 evrpc_hook_context_free(rpc_state->hook_meta);
394 if (rpc_state->request != NULL)
395 rpc->request_free(rpc_state->request);
396 if (rpc_state->reply != NULL)
397 rpc->reply_free(rpc_state->reply);
398 if (rpc_state->rpc_data != NULL)
399 evbuffer_free(rpc_state->rpc_data);
400 mm_free(rpc_state);
401 }
402
403 static void
404 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
405
406 void
evrpc_request_done(struct evrpc_req_generic * rpc_state)407 evrpc_request_done(struct evrpc_req_generic *rpc_state)
408 {
409 struct evhttp_request *req;
410 struct evrpc *rpc;
411
412 EVUTIL_ASSERT(rpc_state);
413
414 req = rpc_state->http_req;
415 rpc = rpc_state->rpc;
416
417 if (rpc->reply_complete(rpc_state->reply) == -1) {
418 /* the reply was not completely filled in. error out */
419 goto error;
420 }
421
422 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
423 /* out of memory */
424 goto error;
425 }
426
427 /* serialize the reply */
428 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
429
430 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
431 int hook_res;
432
433 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
434
435 /* do hook based tweaks to the request */
436 hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
437 rpc_state, req, rpc_state->rpc_data);
438 switch (hook_res) {
439 case EVRPC_TERMINATE:
440 goto error;
441 case EVRPC_PAUSE:
442 if (evrpc_pause_request(rpc->base, rpc_state,
443 evrpc_request_done_closure) == -1)
444 goto error;
445 return;
446 case EVRPC_CONTINUE:
447 break;
448 default:
449 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
450 hook_res == EVRPC_CONTINUE ||
451 hook_res == EVRPC_PAUSE);
452 }
453 }
454
455 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
456 return;
457
458 error:
459 if (rpc_state != NULL)
460 evrpc_reqstate_free(rpc_state);
461 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
462 return;
463 }
464
465 void *
evrpc_get_request(struct evrpc_req_generic * req)466 evrpc_get_request(struct evrpc_req_generic *req)
467 {
468 return req->request;
469 }
470
471 void *
evrpc_get_reply(struct evrpc_req_generic * req)472 evrpc_get_reply(struct evrpc_req_generic *req)
473 {
474 return req->reply;
475 }
476
477 static void
evrpc_request_done_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)478 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
479 {
480 struct evrpc_req_generic *rpc_state = arg;
481 struct evhttp_request *req;
482 EVUTIL_ASSERT(rpc_state);
483 req = rpc_state->http_req;
484
485 if (hook_res == EVRPC_TERMINATE)
486 goto error;
487
488 /* on success, we are going to transmit marshaled binary data */
489 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
490 evhttp_add_header(req->output_headers,
491 "Content-Type", "application/octet-stream");
492 }
493 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
494
495 evrpc_reqstate_free(rpc_state);
496
497 return;
498
499 error:
500 if (rpc_state != NULL)
501 evrpc_reqstate_free(rpc_state);
502 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
503 return;
504 }
505
506
507 /* Client implementation of RPC site */
508
509 static int evrpc_schedule_request(struct evhttp_connection *connection,
510 struct evrpc_request_wrapper *ctx);
511
512 struct evrpc_pool *
evrpc_pool_new(struct event_base * base)513 evrpc_pool_new(struct event_base *base)
514 {
515 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
516 if (pool == NULL)
517 return (NULL);
518
519 TAILQ_INIT(&pool->connections);
520 TAILQ_INIT(&pool->requests);
521
522 TAILQ_INIT(&pool->paused_requests);
523
524 TAILQ_INIT(&pool->input_hooks);
525 TAILQ_INIT(&pool->output_hooks);
526
527 pool->base = base;
528 pool->timeout = -1;
529
530 return (pool);
531 }
532
533 static void
evrpc_request_wrapper_free(struct evrpc_request_wrapper * request)534 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
535 {
536 if (request->hook_meta != NULL)
537 evrpc_hook_context_free(request->hook_meta);
538 mm_free(request->name);
539 mm_free(request);
540 }
541
542 void
evrpc_pool_free(struct evrpc_pool * pool)543 evrpc_pool_free(struct evrpc_pool *pool)
544 {
545 struct evhttp_connection *connection;
546 struct evrpc_request_wrapper *request;
547 struct evrpc_hook_ctx *pause;
548 struct evrpc_hook *hook;
549 int r;
550
551 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
552 TAILQ_REMOVE(&pool->requests, request, next);
553 evrpc_request_wrapper_free(request);
554 }
555
556 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
557 TAILQ_REMOVE(&pool->paused_requests, pause, next);
558 mm_free(pause);
559 }
560
561 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
562 TAILQ_REMOVE(&pool->connections, connection, next);
563 evhttp_connection_free(connection);
564 }
565
566 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
567 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
568 EVUTIL_ASSERT(r);
569 }
570
571 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
572 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
573 EVUTIL_ASSERT(r);
574 }
575
576 mm_free(pool);
577 }
578
579 /*
580 * Add a connection to the RPC pool. A request scheduled on the pool
581 * may use any available connection.
582 */
583
584 void
evrpc_pool_add_connection(struct evrpc_pool * pool,struct evhttp_connection * connection)585 evrpc_pool_add_connection(struct evrpc_pool *pool,
586 struct evhttp_connection *connection)
587 {
588 EVUTIL_ASSERT(connection->http_server == NULL);
589 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
590
591 /*
592 * associate an event base with this connection
593 */
594 if (pool->base != NULL)
595 evhttp_connection_set_base(connection, pool->base);
596
597 /*
598 * unless a timeout was specifically set for a connection,
599 * the connection inherits the timeout from the pool.
600 */
601 if (connection->timeout == -1)
602 connection->timeout = pool->timeout;
603
604 /*
605 * if we have any requests pending, schedule them with the new
606 * connections.
607 */
608
609 if (TAILQ_FIRST(&pool->requests) != NULL) {
610 struct evrpc_request_wrapper *request =
611 TAILQ_FIRST(&pool->requests);
612 TAILQ_REMOVE(&pool->requests, request, next);
613 evrpc_schedule_request(connection, request);
614 }
615 }
616
617 void
evrpc_pool_remove_connection(struct evrpc_pool * pool,struct evhttp_connection * connection)618 evrpc_pool_remove_connection(struct evrpc_pool *pool,
619 struct evhttp_connection *connection)
620 {
621 TAILQ_REMOVE(&pool->connections, connection, next);
622 }
623
624 void
evrpc_pool_set_timeout(struct evrpc_pool * pool,int timeout_in_secs)625 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
626 {
627 struct evhttp_connection *evcon;
628 TAILQ_FOREACH(evcon, &pool->connections, next) {
629 evcon->timeout = timeout_in_secs;
630 }
631 pool->timeout = timeout_in_secs;
632 }
633
634
635 static void evrpc_reply_done(struct evhttp_request *, void *);
636 static void evrpc_request_timeout(evutil_socket_t, short, void *);
637
638 /*
639 * Finds a connection object associated with the pool that is currently
640 * idle and can be used to make a request.
641 */
642 static struct evhttp_connection *
evrpc_pool_find_connection(struct evrpc_pool * pool)643 evrpc_pool_find_connection(struct evrpc_pool *pool)
644 {
645 struct evhttp_connection *connection;
646 TAILQ_FOREACH(connection, &pool->connections, next) {
647 if (TAILQ_FIRST(&connection->requests) == NULL)
648 return (connection);
649 }
650
651 return (NULL);
652 }
653
654 /*
655 * Prototypes responsible for evrpc scheduling and hooking
656 */
657
658 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
659
660 /*
661 * We assume that the ctx is no longer queued on the pool.
662 */
663 static int
evrpc_schedule_request(struct evhttp_connection * connection,struct evrpc_request_wrapper * ctx)664 evrpc_schedule_request(struct evhttp_connection *connection,
665 struct evrpc_request_wrapper *ctx)
666 {
667 struct evhttp_request *req = NULL;
668 struct evrpc_pool *pool = ctx->pool;
669 struct evrpc_status status;
670
671 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
672 goto error;
673
674 /* serialize the request data into the output buffer */
675 ctx->request_marshal(req->output_buffer, ctx->request);
676
677 /* we need to know the connection that we might have to abort */
678 ctx->evcon = connection;
679
680 /* if we get paused we also need to know the request */
681 ctx->req = req;
682
683 if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
684 int hook_res;
685
686 evrpc_hook_associate_meta(&ctx->hook_meta, connection);
687
688 /* apply hooks to the outgoing request */
689 hook_res = evrpc_process_hooks(&pool->output_hooks,
690 ctx, req, req->output_buffer);
691
692 switch (hook_res) {
693 case EVRPC_TERMINATE:
694 goto error;
695 case EVRPC_PAUSE:
696 /* we need to be explicitly resumed */
697 if (evrpc_pause_request(pool, ctx,
698 evrpc_schedule_request_closure) == -1)
699 goto error;
700 return (0);
701 case EVRPC_CONTINUE:
702 /* we can just continue */
703 break;
704 default:
705 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
706 hook_res == EVRPC_CONTINUE ||
707 hook_res == EVRPC_PAUSE);
708 }
709 }
710
711 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
712 return (0);
713
714 error:
715 memset(&status, 0, sizeof(status));
716 status.error = EVRPC_STATUS_ERR_UNSTARTED;
717 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
718 evrpc_request_wrapper_free(ctx);
719 return (-1);
720 }
721
722 static void
evrpc_schedule_request_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)723 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
724 {
725 struct evrpc_request_wrapper *ctx = arg;
726 struct evhttp_connection *connection = ctx->evcon;
727 struct evhttp_request *req = ctx->req;
728 struct evrpc_pool *pool = ctx->pool;
729 struct evrpc_status status;
730 char *uri = NULL;
731 int res = 0;
732
733 if (hook_res == EVRPC_TERMINATE)
734 goto error;
735
736 uri = evrpc_construct_uri(ctx->name);
737 if (uri == NULL)
738 goto error;
739
740 if (pool->timeout > 0) {
741 /*
742 * a timeout after which the whole rpc is going to be aborted.
743 */
744 struct timeval tv;
745 evutil_timerclear(&tv);
746 tv.tv_sec = pool->timeout;
747 evtimer_add(&ctx->ev_timeout, &tv);
748 }
749
750 /* start the request over the connection */
751 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
752 mm_free(uri);
753
754 if (res == -1)
755 goto error;
756
757 return;
758
759 error:
760 memset(&status, 0, sizeof(status));
761 status.error = EVRPC_STATUS_ERR_UNSTARTED;
762 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
763 evrpc_request_wrapper_free(ctx);
764 }
765
766 /* we just queue the paused request on the pool under the req object */
767 static int
evrpc_pause_request(void * vbase,void * ctx,void (* cb)(void *,enum EVRPC_HOOK_RESULT))768 evrpc_pause_request(void *vbase, void *ctx,
769 void (*cb)(void *, enum EVRPC_HOOK_RESULT))
770 {
771 struct _evrpc_hooks *base = vbase;
772 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
773 if (pause == NULL)
774 return (-1);
775
776 pause->ctx = ctx;
777 pause->cb = cb;
778
779 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
780 return (0);
781 }
782
783 int
evrpc_resume_request(void * vbase,void * ctx,enum EVRPC_HOOK_RESULT res)784 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
785 {
786 struct _evrpc_hooks *base = vbase;
787 struct evrpc_pause_list *head = &base->pause_requests;
788 struct evrpc_hook_ctx *pause;
789
790 TAILQ_FOREACH(pause, head, next) {
791 if (pause->ctx == ctx)
792 break;
793 }
794
795 if (pause == NULL)
796 return (-1);
797
798 (*pause->cb)(pause->ctx, res);
799 TAILQ_REMOVE(head, pause, next);
800 mm_free(pause);
801 return (0);
802 }
803
804 int
evrpc_make_request(struct evrpc_request_wrapper * ctx)805 evrpc_make_request(struct evrpc_request_wrapper *ctx)
806 {
807 struct evrpc_pool *pool = ctx->pool;
808
809 /* initialize the event structure for this rpc */
810 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
811
812 /* we better have some available connections on the pool */
813 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
814
815 /*
816 * if no connection is available, we queue the request on the pool,
817 * the next time a connection is empty, the rpc will be send on that.
818 */
819 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
820
821 evrpc_pool_schedule(pool);
822
823 return (0);
824 }
825
826
827 struct evrpc_request_wrapper *
evrpc_make_request_ctx(struct evrpc_pool * pool,void * request,void * reply,const char * rpcname,void (* req_marshal)(struct evbuffer *,void *),void (* rpl_clear)(void *),int (* rpl_unmarshal)(void *,struct evbuffer *),void (* cb)(struct evrpc_status *,void *,void *,void *),void * cbarg)828 evrpc_make_request_ctx(
829 struct evrpc_pool *pool, void *request, void *reply,
830 const char *rpcname,
831 void (*req_marshal)(struct evbuffer*, void *),
832 void (*rpl_clear)(void *),
833 int (*rpl_unmarshal)(void *, struct evbuffer *),
834 void (*cb)(struct evrpc_status *, void *, void *, void *),
835 void *cbarg)
836 {
837 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
838 mm_malloc(sizeof(struct evrpc_request_wrapper));
839 if (ctx == NULL)
840 return (NULL);
841
842 ctx->pool = pool;
843 ctx->hook_meta = NULL;
844 ctx->evcon = NULL;
845 ctx->name = mm_strdup(rpcname);
846 if (ctx->name == NULL) {
847 mm_free(ctx);
848 return (NULL);
849 }
850 ctx->cb = cb;
851 ctx->cb_arg = cbarg;
852 ctx->request = request;
853 ctx->reply = reply;
854 ctx->request_marshal = req_marshal;
855 ctx->reply_clear = rpl_clear;
856 ctx->reply_unmarshal = rpl_unmarshal;
857
858 return (ctx);
859 }
860
861 static void
862 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
863
864 static void
evrpc_reply_done(struct evhttp_request * req,void * arg)865 evrpc_reply_done(struct evhttp_request *req, void *arg)
866 {
867 struct evrpc_request_wrapper *ctx = arg;
868 struct evrpc_pool *pool = ctx->pool;
869 int hook_res = EVRPC_CONTINUE;
870
871 /* cancel any timeout we might have scheduled */
872 event_del(&ctx->ev_timeout);
873
874 ctx->req = req;
875
876 /* we need to get the reply now */
877 if (req == NULL) {
878 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
879 return;
880 }
881
882 if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
883 evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
884
885 /* apply hooks to the incoming request */
886 hook_res = evrpc_process_hooks(&pool->input_hooks,
887 ctx, req, req->input_buffer);
888
889 switch (hook_res) {
890 case EVRPC_TERMINATE:
891 case EVRPC_CONTINUE:
892 break;
893 case EVRPC_PAUSE:
894 /*
895 * if we get paused we also need to know the
896 * request. unfortunately, the underlying
897 * layer is going to free it. we need to
898 * request ownership explicitly
899 */
900 if (req != NULL)
901 evhttp_request_own(req);
902
903 evrpc_pause_request(pool, ctx,
904 evrpc_reply_done_closure);
905 return;
906 default:
907 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
908 hook_res == EVRPC_CONTINUE ||
909 hook_res == EVRPC_PAUSE);
910 }
911 }
912
913 evrpc_reply_done_closure(ctx, hook_res);
914
915 /* http request is being freed by underlying layer */
916 }
917
918 static void
evrpc_reply_done_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)919 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
920 {
921 struct evrpc_request_wrapper *ctx = arg;
922 struct evhttp_request *req = ctx->req;
923 struct evrpc_pool *pool = ctx->pool;
924 struct evrpc_status status;
925 int res = -1;
926
927 memset(&status, 0, sizeof(status));
928 status.http_req = req;
929
930 /* we need to get the reply now */
931 if (req == NULL) {
932 status.error = EVRPC_STATUS_ERR_TIMEOUT;
933 } else if (hook_res == EVRPC_TERMINATE) {
934 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
935 } else {
936 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
937 if (res == -1)
938 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
939 }
940
941 if (res == -1) {
942 /* clear everything that we might have written previously */
943 ctx->reply_clear(ctx->reply);
944 }
945
946 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
947
948 evrpc_request_wrapper_free(ctx);
949
950 /* the http layer owned the original request structure, but if we
951 * got paused, we asked for ownership and need to free it here. */
952 if (req != NULL && evhttp_request_is_owned(req))
953 evhttp_request_free(req);
954
955 /* see if we can schedule another request */
956 evrpc_pool_schedule(pool);
957 }
958
959 static void
evrpc_pool_schedule(struct evrpc_pool * pool)960 evrpc_pool_schedule(struct evrpc_pool *pool)
961 {
962 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
963 struct evhttp_connection *evcon;
964
965 /* if no requests are pending, we have no work */
966 if (ctx == NULL)
967 return;
968
969 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
970 TAILQ_REMOVE(&pool->requests, ctx, next);
971 evrpc_schedule_request(evcon, ctx);
972 }
973 }
974
975 static void
evrpc_request_timeout(evutil_socket_t fd,short what,void * arg)976 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
977 {
978 struct evrpc_request_wrapper *ctx = arg;
979 struct evhttp_connection *evcon = ctx->evcon;
980 EVUTIL_ASSERT(evcon != NULL);
981
982 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
983 }
984
985 /*
986 * frees potential meta data associated with a request.
987 */
988
989 static void
evrpc_meta_data_free(struct evrpc_meta_list * meta_data)990 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
991 {
992 struct evrpc_meta *entry;
993 EVUTIL_ASSERT(meta_data != NULL);
994
995 while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
996 TAILQ_REMOVE(meta_data, entry, next);
997 mm_free(entry->key);
998 mm_free(entry->data);
999 mm_free(entry);
1000 }
1001 }
1002
1003 static struct evrpc_hook_meta *
evrpc_hook_meta_new(void)1004 evrpc_hook_meta_new(void)
1005 {
1006 struct evrpc_hook_meta *ctx;
1007 ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1008 EVUTIL_ASSERT(ctx != NULL);
1009
1010 TAILQ_INIT(&ctx->meta_data);
1011 ctx->evcon = NULL;
1012
1013 return (ctx);
1014 }
1015
1016 static void
evrpc_hook_associate_meta(struct evrpc_hook_meta ** pctx,struct evhttp_connection * evcon)1017 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
1018 struct evhttp_connection *evcon)
1019 {
1020 struct evrpc_hook_meta *ctx = *pctx;
1021 if (ctx == NULL)
1022 *pctx = ctx = evrpc_hook_meta_new();
1023 ctx->evcon = evcon;
1024 }
1025
1026 static void
evrpc_hook_context_free(struct evrpc_hook_meta * ctx)1027 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
1028 {
1029 evrpc_meta_data_free(&ctx->meta_data);
1030 mm_free(ctx);
1031 }
1032
1033 /* Adds meta data */
1034 void
evrpc_hook_add_meta(void * ctx,const char * key,const void * data,size_t data_size)1035 evrpc_hook_add_meta(void *ctx, const char *key,
1036 const void *data, size_t data_size)
1037 {
1038 struct evrpc_request_wrapper *req = ctx;
1039 struct evrpc_hook_meta *store = NULL;
1040 struct evrpc_meta *meta = NULL;
1041
1042 if ((store = req->hook_meta) == NULL)
1043 store = req->hook_meta = evrpc_hook_meta_new();
1044
1045 meta = mm_malloc(sizeof(struct evrpc_meta));
1046 EVUTIL_ASSERT(meta != NULL);
1047 meta->key = mm_strdup(key);
1048 EVUTIL_ASSERT(meta->key != NULL);
1049 meta->data_size = data_size;
1050 meta->data = mm_malloc(data_size);
1051 EVUTIL_ASSERT(meta->data != NULL);
1052 memcpy(meta->data, data, data_size);
1053
1054 TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1055 }
1056
1057 int
evrpc_hook_find_meta(void * ctx,const char * key,void ** data,size_t * data_size)1058 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1059 {
1060 struct evrpc_request_wrapper *req = ctx;
1061 struct evrpc_meta *meta = NULL;
1062
1063 if (req->hook_meta == NULL)
1064 return (-1);
1065
1066 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1067 if (strcmp(meta->key, key) == 0) {
1068 *data = meta->data;
1069 *data_size = meta->data_size;
1070 return (0);
1071 }
1072 }
1073
1074 return (-1);
1075 }
1076
1077 struct evhttp_connection *
evrpc_hook_get_connection(void * ctx)1078 evrpc_hook_get_connection(void *ctx)
1079 {
1080 struct evrpc_request_wrapper *req = ctx;
1081 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1082 }
1083
1084 int
evrpc_send_request_generic(struct evrpc_pool * pool,void * request,void * reply,void (* cb)(struct evrpc_status *,void *,void *,void *),void * cb_arg,const char * rpcname,void (* req_marshal)(struct evbuffer *,void *),void (* rpl_clear)(void *),int (* rpl_unmarshal)(void *,struct evbuffer *))1085 evrpc_send_request_generic(struct evrpc_pool *pool,
1086 void *request, void *reply,
1087 void (*cb)(struct evrpc_status *, void *, void *, void *),
1088 void *cb_arg,
1089 const char *rpcname,
1090 void (*req_marshal)(struct evbuffer *, void *),
1091 void (*rpl_clear)(void *),
1092 int (*rpl_unmarshal)(void *, struct evbuffer *))
1093 {
1094 struct evrpc_status status;
1095 struct evrpc_request_wrapper *ctx;
1096 ctx = evrpc_make_request_ctx(pool, request, reply,
1097 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1098 if (ctx == NULL)
1099 goto error;
1100 return (evrpc_make_request(ctx));
1101 error:
1102 memset(&status, 0, sizeof(status));
1103 status.error = EVRPC_STATUS_ERR_UNSTARTED;
1104 (*(cb))(&status, request, reply, cb_arg);
1105 return (-1);
1106 }
1107
1108 /** Takes a request object and fills it in with the right magic */
1109 static struct evrpc *
evrpc_register_object(const char * name,void * (* req_new)(void *),void * req_new_arg,void (* req_free)(void *),int (* req_unmarshal)(void *,struct evbuffer *),void * (* rpl_new)(void *),void * rpl_new_arg,void (* rpl_free)(void *),int (* rpl_complete)(void *),void (* rpl_marshal)(struct evbuffer *,void *))1110 evrpc_register_object(const char *name,
1111 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1112 int (*req_unmarshal)(void *, struct evbuffer *),
1113 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1114 int (*rpl_complete)(void *),
1115 void (*rpl_marshal)(struct evbuffer *, void *))
1116 {
1117 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1118 if (rpc == NULL)
1119 return (NULL);
1120 rpc->uri = mm_strdup(name);
1121 if (rpc->uri == NULL) {
1122 mm_free(rpc);
1123 return (NULL);
1124 }
1125 rpc->request_new = req_new;
1126 rpc->request_new_arg = req_new_arg;
1127 rpc->request_free = req_free;
1128 rpc->request_unmarshal = req_unmarshal;
1129 rpc->reply_new = rpl_new;
1130 rpc->reply_new_arg = rpl_new_arg;
1131 rpc->reply_free = rpl_free;
1132 rpc->reply_complete = rpl_complete;
1133 rpc->reply_marshal = rpl_marshal;
1134 return (rpc);
1135 }
1136
1137 int
evrpc_register_generic(struct evrpc_base * base,const char * name,void (* callback)(struct evrpc_req_generic *,void *),void * cbarg,void * (* req_new)(void *),void * req_new_arg,void (* req_free)(void *),int (* req_unmarshal)(void *,struct evbuffer *),void * (* rpl_new)(void *),void * rpl_new_arg,void (* rpl_free)(void *),int (* rpl_complete)(void *),void (* rpl_marshal)(struct evbuffer *,void *))1138 evrpc_register_generic(struct evrpc_base *base, const char *name,
1139 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1140 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1141 int (*req_unmarshal)(void *, struct evbuffer *),
1142 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1143 int (*rpl_complete)(void *),
1144 void (*rpl_marshal)(struct evbuffer *, void *))
1145 {
1146 struct evrpc* rpc =
1147 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1148 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1149 if (rpc == NULL)
1150 return (-1);
1151 evrpc_register_rpc(base, rpc,
1152 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1153 return (0);
1154 }
1155
1156 /** accessors for obscure and undocumented functionality */
1157 struct evrpc_pool *
evrpc_request_get_pool(struct evrpc_request_wrapper * ctx)1158 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1159 {
1160 return (ctx->pool);
1161 }
1162
1163 void
evrpc_request_set_pool(struct evrpc_request_wrapper * ctx,struct evrpc_pool * pool)1164 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1165 struct evrpc_pool *pool)
1166 {
1167 ctx->pool = pool;
1168 }
1169
1170 void
evrpc_request_set_cb(struct evrpc_request_wrapper * ctx,void (* cb)(struct evrpc_status *,void * request,void * reply,void * arg),void * cb_arg)1171 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1172 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1173 void *cb_arg)
1174 {
1175 ctx->cb = cb;
1176 ctx->cb_arg = cb_arg;
1177 }
1178