1*86d7f5d3SJohn Marino /*-
2*86d7f5d3SJohn Marino * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3*86d7f5d3SJohn Marino * All rights reserved.
4*86d7f5d3SJohn Marino *
5*86d7f5d3SJohn Marino * Redistribution and use in source and binary forms, with or without
6*86d7f5d3SJohn Marino * modification, are permitted provided that the following conditions
7*86d7f5d3SJohn Marino * are met:
8*86d7f5d3SJohn Marino * 1. Redistributions of source code must retain the above copyright
9*86d7f5d3SJohn Marino * notice, this list of conditions and the following disclaimer.
10*86d7f5d3SJohn Marino * 2. Redistributions in binary form must reproduce the above copyright
11*86d7f5d3SJohn Marino * notice, this list of conditions and the following disclaimer in the
12*86d7f5d3SJohn Marino * documentation and/or other materials provided with the distribution.
13*86d7f5d3SJohn Marino *
14*86d7f5d3SJohn Marino * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15*86d7f5d3SJohn Marino * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16*86d7f5d3SJohn Marino * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17*86d7f5d3SJohn Marino * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18*86d7f5d3SJohn Marino * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19*86d7f5d3SJohn Marino * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20*86d7f5d3SJohn Marino * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21*86d7f5d3SJohn Marino * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22*86d7f5d3SJohn Marino * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23*86d7f5d3SJohn Marino * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24*86d7f5d3SJohn Marino * SUCH DAMAGE.
25*86d7f5d3SJohn Marino *
26*86d7f5d3SJohn Marino * $FreeBSD: src/usr.sbin/nscd/mp_rs_query.c,v 1.4 2008/10/12 00:44:27 delphij Exp $
27*86d7f5d3SJohn Marino */
28*86d7f5d3SJohn Marino
29*86d7f5d3SJohn Marino #include <sys/socket.h>
30*86d7f5d3SJohn Marino #include <sys/time.h>
31*86d7f5d3SJohn Marino #include <sys/types.h>
32*86d7f5d3SJohn Marino #include <sys/event.h>
33*86d7f5d3SJohn Marino #include <assert.h>
34*86d7f5d3SJohn Marino #include <errno.h>
35*86d7f5d3SJohn Marino #include <stdlib.h>
36*86d7f5d3SJohn Marino #include <string.h>
37*86d7f5d3SJohn Marino #include <stdio.h>
38*86d7f5d3SJohn Marino
39*86d7f5d3SJohn Marino #include "cachelib.h"
40*86d7f5d3SJohn Marino #include "config.h"
41*86d7f5d3SJohn Marino #include "debug.h"
42*86d7f5d3SJohn Marino #include "log.h"
43*86d7f5d3SJohn Marino #include "query.h"
44*86d7f5d3SJohn Marino #include "mp_rs_query.h"
45*86d7f5d3SJohn Marino #include "mp_ws_query.h"
46*86d7f5d3SJohn Marino #include "singletons.h"
47*86d7f5d3SJohn Marino
48*86d7f5d3SJohn Marino static int on_mp_read_session_close_notification(struct query_state *);
49*86d7f5d3SJohn Marino static void on_mp_read_session_destroy(struct query_state *);
50*86d7f5d3SJohn Marino static int on_mp_read_session_mapper(struct query_state *);
51*86d7f5d3SJohn Marino /* int on_mp_read_session_request_read1(struct query_state *); */
52*86d7f5d3SJohn Marino static int on_mp_read_session_request_read2(struct query_state *);
53*86d7f5d3SJohn Marino static int on_mp_read_session_request_process(struct query_state *);
54*86d7f5d3SJohn Marino static int on_mp_read_session_response_write1(struct query_state *);
55*86d7f5d3SJohn Marino static int on_mp_read_session_read_request_process(struct query_state *);
56*86d7f5d3SJohn Marino static int on_mp_read_session_read_response_write1(struct query_state *);
57*86d7f5d3SJohn Marino static int on_mp_read_session_read_response_write2(struct query_state *);
58*86d7f5d3SJohn Marino
59*86d7f5d3SJohn Marino /*
60*86d7f5d3SJohn Marino * This function is used as the query_state's destroy_func to make the
61*86d7f5d3SJohn Marino * proper cleanup in case of errors.
62*86d7f5d3SJohn Marino */
63*86d7f5d3SJohn Marino static void
on_mp_read_session_destroy(struct query_state * qstate)64*86d7f5d3SJohn Marino on_mp_read_session_destroy(struct query_state *qstate)
65*86d7f5d3SJohn Marino {
66*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_destroy);
67*86d7f5d3SJohn Marino finalize_comm_element(&qstate->request);
68*86d7f5d3SJohn Marino finalize_comm_element(&qstate->response);
69*86d7f5d3SJohn Marino
70*86d7f5d3SJohn Marino if (qstate->mdata != NULL) {
71*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
72*86d7f5d3SJohn Marino close_cache_mp_read_session(
73*86d7f5d3SJohn Marino (cache_mp_read_session)qstate->mdata);
74*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry,
75*86d7f5d3SJohn Marino CELT_MULTIPART);
76*86d7f5d3SJohn Marino }
77*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_destroy);
78*86d7f5d3SJohn Marino }
79*86d7f5d3SJohn Marino
80*86d7f5d3SJohn Marino /*
81*86d7f5d3SJohn Marino * The functions below are used to process multipart read session initiation
82*86d7f5d3SJohn Marino * requests.
83*86d7f5d3SJohn Marino * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
84*86d7f5d3SJohn Marino * the request itself
85*86d7f5d3SJohn Marino * - on_mp_read_session_request_process processes it
86*86d7f5d3SJohn Marino * - on_mp_read_session_response_write1 sends the response
87*86d7f5d3SJohn Marino */
88*86d7f5d3SJohn Marino int
on_mp_read_session_request_read1(struct query_state * qstate)89*86d7f5d3SJohn Marino on_mp_read_session_request_read1(struct query_state *qstate)
90*86d7f5d3SJohn Marino {
91*86d7f5d3SJohn Marino struct cache_mp_read_session_request *c_mp_rs_request;
92*86d7f5d3SJohn Marino ssize_t result;
93*86d7f5d3SJohn Marino
94*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_request_read1);
95*86d7f5d3SJohn Marino if (qstate->kevent_watermark == 0)
96*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(size_t);
97*86d7f5d3SJohn Marino else {
98*86d7f5d3SJohn Marino init_comm_element(&qstate->request,
99*86d7f5d3SJohn Marino CET_MP_READ_SESSION_REQUEST);
100*86d7f5d3SJohn Marino c_mp_rs_request = get_cache_mp_read_session_request(
101*86d7f5d3SJohn Marino &qstate->request);
102*86d7f5d3SJohn Marino
103*86d7f5d3SJohn Marino result = qstate->read_func(qstate,
104*86d7f5d3SJohn Marino &c_mp_rs_request->entry_length, sizeof(size_t));
105*86d7f5d3SJohn Marino
106*86d7f5d3SJohn Marino if (result != sizeof(size_t)) {
107*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_request_read1);
108*86d7f5d3SJohn Marino return (-1);
109*86d7f5d3SJohn Marino }
110*86d7f5d3SJohn Marino
111*86d7f5d3SJohn Marino if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
112*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_request_read1);
113*86d7f5d3SJohn Marino return (-1);
114*86d7f5d3SJohn Marino }
115*86d7f5d3SJohn Marino
116*86d7f5d3SJohn Marino c_mp_rs_request->entry = (char *)calloc(1,
117*86d7f5d3SJohn Marino c_mp_rs_request->entry_length + 1);
118*86d7f5d3SJohn Marino assert(c_mp_rs_request->entry != NULL);
119*86d7f5d3SJohn Marino
120*86d7f5d3SJohn Marino qstate->kevent_watermark = c_mp_rs_request->entry_length;
121*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_request_read2;
122*86d7f5d3SJohn Marino }
123*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_request_read1);
124*86d7f5d3SJohn Marino return (0);
125*86d7f5d3SJohn Marino }
126*86d7f5d3SJohn Marino
127*86d7f5d3SJohn Marino static int
on_mp_read_session_request_read2(struct query_state * qstate)128*86d7f5d3SJohn Marino on_mp_read_session_request_read2(struct query_state *qstate)
129*86d7f5d3SJohn Marino {
130*86d7f5d3SJohn Marino struct cache_mp_read_session_request *c_mp_rs_request;
131*86d7f5d3SJohn Marino ssize_t result;
132*86d7f5d3SJohn Marino
133*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_request_read2);
134*86d7f5d3SJohn Marino c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
135*86d7f5d3SJohn Marino
136*86d7f5d3SJohn Marino result = qstate->read_func(qstate, c_mp_rs_request->entry,
137*86d7f5d3SJohn Marino c_mp_rs_request->entry_length);
138*86d7f5d3SJohn Marino
139*86d7f5d3SJohn Marino if (result != qstate->kevent_watermark) {
140*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_request_read2",
141*86d7f5d3SJohn Marino "read failed");
142*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_request_read2);
143*86d7f5d3SJohn Marino return (-1);
144*86d7f5d3SJohn Marino }
145*86d7f5d3SJohn Marino
146*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
147*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_request_process;
148*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_request_read2);
149*86d7f5d3SJohn Marino return (0);
150*86d7f5d3SJohn Marino }
151*86d7f5d3SJohn Marino
152*86d7f5d3SJohn Marino static int
on_mp_read_session_request_process(struct query_state * qstate)153*86d7f5d3SJohn Marino on_mp_read_session_request_process(struct query_state *qstate)
154*86d7f5d3SJohn Marino {
155*86d7f5d3SJohn Marino struct cache_mp_read_session_request *c_mp_rs_request;
156*86d7f5d3SJohn Marino struct cache_mp_read_session_response *c_mp_rs_response;
157*86d7f5d3SJohn Marino cache_mp_read_session rs;
158*86d7f5d3SJohn Marino cache_entry c_entry;
159*86d7f5d3SJohn Marino char *dec_cache_entry_name;
160*86d7f5d3SJohn Marino
161*86d7f5d3SJohn Marino char *buffer;
162*86d7f5d3SJohn Marino size_t buffer_size;
163*86d7f5d3SJohn Marino cache_mp_write_session ws;
164*86d7f5d3SJohn Marino struct agent *lookup_agent;
165*86d7f5d3SJohn Marino struct multipart_agent *mp_agent;
166*86d7f5d3SJohn Marino void *mdata;
167*86d7f5d3SJohn Marino int res;
168*86d7f5d3SJohn Marino
169*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_request_process);
170*86d7f5d3SJohn Marino init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
171*86d7f5d3SJohn Marino c_mp_rs_response = get_cache_mp_read_session_response(
172*86d7f5d3SJohn Marino &qstate->response);
173*86d7f5d3SJohn Marino c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
174*86d7f5d3SJohn Marino
175*86d7f5d3SJohn Marino qstate->config_entry = configuration_find_entry(
176*86d7f5d3SJohn Marino s_configuration, c_mp_rs_request->entry);
177*86d7f5d3SJohn Marino if (qstate->config_entry == NULL) {
178*86d7f5d3SJohn Marino c_mp_rs_response->error_code = ENOENT;
179*86d7f5d3SJohn Marino
180*86d7f5d3SJohn Marino LOG_ERR_2("read_session_request",
181*86d7f5d3SJohn Marino "can't find configuration entry '%s'."
182*86d7f5d3SJohn Marino " aborting request", c_mp_rs_request->entry);
183*86d7f5d3SJohn Marino goto fin;
184*86d7f5d3SJohn Marino }
185*86d7f5d3SJohn Marino
186*86d7f5d3SJohn Marino if (qstate->config_entry->enabled == 0) {
187*86d7f5d3SJohn Marino c_mp_rs_response->error_code = EACCES;
188*86d7f5d3SJohn Marino
189*86d7f5d3SJohn Marino LOG_ERR_2("read_session_request",
190*86d7f5d3SJohn Marino "configuration entry '%s' is disabled",
191*86d7f5d3SJohn Marino c_mp_rs_request->entry);
192*86d7f5d3SJohn Marino goto fin;
193*86d7f5d3SJohn Marino }
194*86d7f5d3SJohn Marino
195*86d7f5d3SJohn Marino if (qstate->config_entry->perform_actual_lookups != 0)
196*86d7f5d3SJohn Marino dec_cache_entry_name = strdup(
197*86d7f5d3SJohn Marino qstate->config_entry->mp_cache_params.entry_name);
198*86d7f5d3SJohn Marino else {
199*86d7f5d3SJohn Marino #ifdef NS_NSCD_EID_CHECKING
200*86d7f5d3SJohn Marino if (check_query_eids(qstate) != 0) {
201*86d7f5d3SJohn Marino c_mp_rs_response->error_code = EPERM;
202*86d7f5d3SJohn Marino goto fin;
203*86d7f5d3SJohn Marino }
204*86d7f5d3SJohn Marino #endif
205*86d7f5d3SJohn Marino
206*86d7f5d3SJohn Marino asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
207*86d7f5d3SJohn Marino qstate->config_entry->mp_cache_params.entry_name);
208*86d7f5d3SJohn Marino }
209*86d7f5d3SJohn Marino
210*86d7f5d3SJohn Marino assert(dec_cache_entry_name != NULL);
211*86d7f5d3SJohn Marino
212*86d7f5d3SJohn Marino configuration_lock_rdlock(s_configuration);
213*86d7f5d3SJohn Marino c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
214*86d7f5d3SJohn Marino configuration_unlock(s_configuration);
215*86d7f5d3SJohn Marino
216*86d7f5d3SJohn Marino if ((c_entry == INVALID_CACHE) &&
217*86d7f5d3SJohn Marino (qstate->config_entry->perform_actual_lookups != 0))
218*86d7f5d3SJohn Marino c_entry = register_new_mp_cache_entry(qstate,
219*86d7f5d3SJohn Marino dec_cache_entry_name);
220*86d7f5d3SJohn Marino
221*86d7f5d3SJohn Marino free(dec_cache_entry_name);
222*86d7f5d3SJohn Marino
223*86d7f5d3SJohn Marino if (c_entry != INVALID_CACHE_ENTRY) {
224*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
225*86d7f5d3SJohn Marino rs = open_cache_mp_read_session(c_entry);
226*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry,
227*86d7f5d3SJohn Marino CELT_MULTIPART);
228*86d7f5d3SJohn Marino
229*86d7f5d3SJohn Marino if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
230*86d7f5d3SJohn Marino (qstate->config_entry->perform_actual_lookups != 0)) {
231*86d7f5d3SJohn Marino lookup_agent = find_agent(s_agent_table,
232*86d7f5d3SJohn Marino c_mp_rs_request->entry, MULTIPART_AGENT);
233*86d7f5d3SJohn Marino
234*86d7f5d3SJohn Marino if ((lookup_agent != NULL) &&
235*86d7f5d3SJohn Marino (lookup_agent->type == MULTIPART_AGENT)) {
236*86d7f5d3SJohn Marino mp_agent = (struct multipart_agent *)
237*86d7f5d3SJohn Marino lookup_agent;
238*86d7f5d3SJohn Marino mdata = mp_agent->mp_init_func();
239*86d7f5d3SJohn Marino
240*86d7f5d3SJohn Marino /*
241*86d7f5d3SJohn Marino * Multipart agents read the whole snapshot
242*86d7f5d3SJohn Marino * of the data at one time.
243*86d7f5d3SJohn Marino */
244*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry,
245*86d7f5d3SJohn Marino CELT_MULTIPART);
246*86d7f5d3SJohn Marino ws = open_cache_mp_write_session(c_entry);
247*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry,
248*86d7f5d3SJohn Marino CELT_MULTIPART);
249*86d7f5d3SJohn Marino if (ws != NULL) {
250*86d7f5d3SJohn Marino do {
251*86d7f5d3SJohn Marino buffer = NULL;
252*86d7f5d3SJohn Marino res = mp_agent->mp_lookup_func(&buffer,
253*86d7f5d3SJohn Marino &buffer_size,
254*86d7f5d3SJohn Marino mdata);
255*86d7f5d3SJohn Marino
256*86d7f5d3SJohn Marino if ((res & NS_TERMINATE) &&
257*86d7f5d3SJohn Marino (buffer != NULL)) {
258*86d7f5d3SJohn Marino configuration_lock_entry(
259*86d7f5d3SJohn Marino qstate->config_entry,
260*86d7f5d3SJohn Marino CELT_MULTIPART);
261*86d7f5d3SJohn Marino if (cache_mp_write(ws, buffer,
262*86d7f5d3SJohn Marino buffer_size) != 0) {
263*86d7f5d3SJohn Marino abandon_cache_mp_write_session(ws);
264*86d7f5d3SJohn Marino ws = NULL;
265*86d7f5d3SJohn Marino }
266*86d7f5d3SJohn Marino configuration_unlock_entry(
267*86d7f5d3SJohn Marino qstate->config_entry,
268*86d7f5d3SJohn Marino CELT_MULTIPART);
269*86d7f5d3SJohn Marino
270*86d7f5d3SJohn Marino free(buffer);
271*86d7f5d3SJohn Marino buffer = NULL;
272*86d7f5d3SJohn Marino } else {
273*86d7f5d3SJohn Marino configuration_lock_entry(
274*86d7f5d3SJohn Marino qstate->config_entry,
275*86d7f5d3SJohn Marino CELT_MULTIPART);
276*86d7f5d3SJohn Marino close_cache_mp_write_session(ws);
277*86d7f5d3SJohn Marino configuration_unlock_entry(
278*86d7f5d3SJohn Marino qstate->config_entry,
279*86d7f5d3SJohn Marino CELT_MULTIPART);
280*86d7f5d3SJohn Marino
281*86d7f5d3SJohn Marino free(buffer);
282*86d7f5d3SJohn Marino buffer = NULL;
283*86d7f5d3SJohn Marino }
284*86d7f5d3SJohn Marino } while ((res & NS_TERMINATE) &&
285*86d7f5d3SJohn Marino (ws != NULL));
286*86d7f5d3SJohn Marino }
287*86d7f5d3SJohn Marino
288*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry,
289*86d7f5d3SJohn Marino CELT_MULTIPART);
290*86d7f5d3SJohn Marino rs = open_cache_mp_read_session(c_entry);
291*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry,
292*86d7f5d3SJohn Marino CELT_MULTIPART);
293*86d7f5d3SJohn Marino }
294*86d7f5d3SJohn Marino }
295*86d7f5d3SJohn Marino
296*86d7f5d3SJohn Marino if (rs == INVALID_CACHE_MP_READ_SESSION)
297*86d7f5d3SJohn Marino c_mp_rs_response->error_code = -1;
298*86d7f5d3SJohn Marino else {
299*86d7f5d3SJohn Marino qstate->mdata = rs;
300*86d7f5d3SJohn Marino qstate->destroy_func = on_mp_read_session_destroy;
301*86d7f5d3SJohn Marino
302*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry,
303*86d7f5d3SJohn Marino CELT_MULTIPART);
304*86d7f5d3SJohn Marino if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
305*86d7f5d3SJohn Marino (qstate->config_entry->mp_query_timeout.tv_usec != 0))
306*86d7f5d3SJohn Marino memcpy(&qstate->timeout,
307*86d7f5d3SJohn Marino &qstate->config_entry->mp_query_timeout,
308*86d7f5d3SJohn Marino sizeof(struct timeval));
309*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry,
310*86d7f5d3SJohn Marino CELT_MULTIPART);
311*86d7f5d3SJohn Marino }
312*86d7f5d3SJohn Marino } else
313*86d7f5d3SJohn Marino c_mp_rs_response->error_code = -1;
314*86d7f5d3SJohn Marino
315*86d7f5d3SJohn Marino fin:
316*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_response_write1;
317*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(int);
318*86d7f5d3SJohn Marino qstate->kevent_filter = EVFILT_WRITE;
319*86d7f5d3SJohn Marino
320*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_request_process);
321*86d7f5d3SJohn Marino return (0);
322*86d7f5d3SJohn Marino }
323*86d7f5d3SJohn Marino
324*86d7f5d3SJohn Marino static int
on_mp_read_session_response_write1(struct query_state * qstate)325*86d7f5d3SJohn Marino on_mp_read_session_response_write1(struct query_state *qstate)
326*86d7f5d3SJohn Marino {
327*86d7f5d3SJohn Marino struct cache_mp_read_session_response *c_mp_rs_response;
328*86d7f5d3SJohn Marino ssize_t result;
329*86d7f5d3SJohn Marino
330*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_response_write1);
331*86d7f5d3SJohn Marino c_mp_rs_response = get_cache_mp_read_session_response(
332*86d7f5d3SJohn Marino &qstate->response);
333*86d7f5d3SJohn Marino result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
334*86d7f5d3SJohn Marino sizeof(int));
335*86d7f5d3SJohn Marino
336*86d7f5d3SJohn Marino if (result != sizeof(int)) {
337*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_response_write1",
338*86d7f5d3SJohn Marino "write failed");
339*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_response_write1);
340*86d7f5d3SJohn Marino return (-1);
341*86d7f5d3SJohn Marino }
342*86d7f5d3SJohn Marino
343*86d7f5d3SJohn Marino if (c_mp_rs_response->error_code == 0) {
344*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(int);
345*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_mapper;
346*86d7f5d3SJohn Marino qstate->kevent_filter = EVFILT_READ;
347*86d7f5d3SJohn Marino } else {
348*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
349*86d7f5d3SJohn Marino qstate->process_func = NULL;
350*86d7f5d3SJohn Marino }
351*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_response_write1);
352*86d7f5d3SJohn Marino return (0);
353*86d7f5d3SJohn Marino }
354*86d7f5d3SJohn Marino
355*86d7f5d3SJohn Marino /*
356*86d7f5d3SJohn Marino * Mapper function is used to avoid multiple connections for each session
357*86d7f5d3SJohn Marino * write or read requests. After processing the request, it does not close
358*86d7f5d3SJohn Marino * the connection, but waits for the next request.
359*86d7f5d3SJohn Marino */
360*86d7f5d3SJohn Marino static int
on_mp_read_session_mapper(struct query_state * qstate)361*86d7f5d3SJohn Marino on_mp_read_session_mapper(struct query_state *qstate)
362*86d7f5d3SJohn Marino {
363*86d7f5d3SJohn Marino ssize_t result;
364*86d7f5d3SJohn Marino int elem_type;
365*86d7f5d3SJohn Marino
366*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_mapper);
367*86d7f5d3SJohn Marino if (qstate->kevent_watermark == 0) {
368*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(int);
369*86d7f5d3SJohn Marino } else {
370*86d7f5d3SJohn Marino result = qstate->read_func(qstate, &elem_type, sizeof(int));
371*86d7f5d3SJohn Marino if (result != sizeof(int)) {
372*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_mapper",
373*86d7f5d3SJohn Marino "read failed");
374*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_mapper);
375*86d7f5d3SJohn Marino return (-1);
376*86d7f5d3SJohn Marino }
377*86d7f5d3SJohn Marino
378*86d7f5d3SJohn Marino switch (elem_type) {
379*86d7f5d3SJohn Marino case CET_MP_READ_SESSION_READ_REQUEST:
380*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
381*86d7f5d3SJohn Marino qstate->process_func =
382*86d7f5d3SJohn Marino on_mp_read_session_read_request_process;
383*86d7f5d3SJohn Marino break;
384*86d7f5d3SJohn Marino case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
385*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
386*86d7f5d3SJohn Marino qstate->process_func =
387*86d7f5d3SJohn Marino on_mp_read_session_close_notification;
388*86d7f5d3SJohn Marino break;
389*86d7f5d3SJohn Marino default:
390*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
391*86d7f5d3SJohn Marino qstate->process_func = NULL;
392*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_mapper",
393*86d7f5d3SJohn Marino "unknown element type");
394*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_mapper);
395*86d7f5d3SJohn Marino return (-1);
396*86d7f5d3SJohn Marino }
397*86d7f5d3SJohn Marino }
398*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_mapper);
399*86d7f5d3SJohn Marino return (0);
400*86d7f5d3SJohn Marino }
401*86d7f5d3SJohn Marino
402*86d7f5d3SJohn Marino /*
403*86d7f5d3SJohn Marino * The functions below are used to process multipart read sessions read
404*86d7f5d3SJohn Marino * requests. User doesn't have to pass any kind of data, besides the
405*86d7f5d3SJohn Marino * request identificator itself. So we don't need any XXX_read functions and
406*86d7f5d3SJohn Marino * start with the XXX_process function.
407*86d7f5d3SJohn Marino * - on_mp_read_session_read_request_process processes it
408*86d7f5d3SJohn Marino * - on_mp_read_session_read_response_write1 and
409*86d7f5d3SJohn Marino * on_mp_read_session_read_response_write2 sends the response
410*86d7f5d3SJohn Marino */
411*86d7f5d3SJohn Marino static int
on_mp_read_session_read_request_process(struct query_state * qstate)412*86d7f5d3SJohn Marino on_mp_read_session_read_request_process(struct query_state *qstate)
413*86d7f5d3SJohn Marino {
414*86d7f5d3SJohn Marino struct cache_mp_read_session_read_response *read_response;
415*86d7f5d3SJohn Marino
416*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_response_process);
417*86d7f5d3SJohn Marino init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
418*86d7f5d3SJohn Marino read_response = get_cache_mp_read_session_read_response(
419*86d7f5d3SJohn Marino &qstate->response);
420*86d7f5d3SJohn Marino
421*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
422*86d7f5d3SJohn Marino read_response->error_code = cache_mp_read(
423*86d7f5d3SJohn Marino (cache_mp_read_session)qstate->mdata, NULL,
424*86d7f5d3SJohn Marino &read_response->data_size);
425*86d7f5d3SJohn Marino
426*86d7f5d3SJohn Marino if (read_response->error_code == 0) {
427*86d7f5d3SJohn Marino read_response->data = (char *)malloc(read_response->data_size);
428*86d7f5d3SJohn Marino assert(read_response != NULL);
429*86d7f5d3SJohn Marino read_response->error_code = cache_mp_read(
430*86d7f5d3SJohn Marino (cache_mp_read_session)qstate->mdata,
431*86d7f5d3SJohn Marino read_response->data,
432*86d7f5d3SJohn Marino &read_response->data_size);
433*86d7f5d3SJohn Marino }
434*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
435*86d7f5d3SJohn Marino
436*86d7f5d3SJohn Marino if (read_response->error_code == 0)
437*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
438*86d7f5d3SJohn Marino else
439*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(int);
440*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_read_response_write1;
441*86d7f5d3SJohn Marino qstate->kevent_filter = EVFILT_WRITE;
442*86d7f5d3SJohn Marino
443*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_response_process);
444*86d7f5d3SJohn Marino return (0);
445*86d7f5d3SJohn Marino }
446*86d7f5d3SJohn Marino
447*86d7f5d3SJohn Marino static int
on_mp_read_session_read_response_write1(struct query_state * qstate)448*86d7f5d3SJohn Marino on_mp_read_session_read_response_write1(struct query_state *qstate)
449*86d7f5d3SJohn Marino {
450*86d7f5d3SJohn Marino struct cache_mp_read_session_read_response *read_response;
451*86d7f5d3SJohn Marino ssize_t result;
452*86d7f5d3SJohn Marino
453*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_read_response_write1);
454*86d7f5d3SJohn Marino read_response = get_cache_mp_read_session_read_response(
455*86d7f5d3SJohn Marino &qstate->response);
456*86d7f5d3SJohn Marino
457*86d7f5d3SJohn Marino result = qstate->write_func(qstate, &read_response->error_code,
458*86d7f5d3SJohn Marino sizeof(int));
459*86d7f5d3SJohn Marino if (read_response->error_code == 0) {
460*86d7f5d3SJohn Marino result += qstate->write_func(qstate, &read_response->data_size,
461*86d7f5d3SJohn Marino sizeof(size_t));
462*86d7f5d3SJohn Marino if (result != qstate->kevent_watermark) {
463*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_read_response_write1);
464*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_read_response_write1",
465*86d7f5d3SJohn Marino "write failed");
466*86d7f5d3SJohn Marino return (-1);
467*86d7f5d3SJohn Marino }
468*86d7f5d3SJohn Marino
469*86d7f5d3SJohn Marino qstate->kevent_watermark = read_response->data_size;
470*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_read_response_write2;
471*86d7f5d3SJohn Marino } else {
472*86d7f5d3SJohn Marino if (result != qstate->kevent_watermark) {
473*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_read_response_write1",
474*86d7f5d3SJohn Marino "write failed");
475*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_read_response_write1);
476*86d7f5d3SJohn Marino return (-1);
477*86d7f5d3SJohn Marino }
478*86d7f5d3SJohn Marino
479*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
480*86d7f5d3SJohn Marino qstate->process_func = NULL;
481*86d7f5d3SJohn Marino }
482*86d7f5d3SJohn Marino
483*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_read_response_write1);
484*86d7f5d3SJohn Marino return (0);
485*86d7f5d3SJohn Marino }
486*86d7f5d3SJohn Marino
487*86d7f5d3SJohn Marino static int
on_mp_read_session_read_response_write2(struct query_state * qstate)488*86d7f5d3SJohn Marino on_mp_read_session_read_response_write2(struct query_state *qstate)
489*86d7f5d3SJohn Marino {
490*86d7f5d3SJohn Marino struct cache_mp_read_session_read_response *read_response;
491*86d7f5d3SJohn Marino ssize_t result;
492*86d7f5d3SJohn Marino
493*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_read_response_write2);
494*86d7f5d3SJohn Marino read_response = get_cache_mp_read_session_read_response(
495*86d7f5d3SJohn Marino &qstate->response);
496*86d7f5d3SJohn Marino result = qstate->write_func(qstate, read_response->data,
497*86d7f5d3SJohn Marino read_response->data_size);
498*86d7f5d3SJohn Marino if (result != qstate->kevent_watermark) {
499*86d7f5d3SJohn Marino LOG_ERR_3("on_mp_read_session_read_response_write2",
500*86d7f5d3SJohn Marino "write failed");
501*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_read_response_write2);
502*86d7f5d3SJohn Marino return (-1);
503*86d7f5d3SJohn Marino }
504*86d7f5d3SJohn Marino
505*86d7f5d3SJohn Marino finalize_comm_element(&qstate->request);
506*86d7f5d3SJohn Marino finalize_comm_element(&qstate->response);
507*86d7f5d3SJohn Marino
508*86d7f5d3SJohn Marino qstate->kevent_watermark = sizeof(int);
509*86d7f5d3SJohn Marino qstate->process_func = on_mp_read_session_mapper;
510*86d7f5d3SJohn Marino qstate->kevent_filter = EVFILT_READ;
511*86d7f5d3SJohn Marino
512*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_read_response_write2);
513*86d7f5d3SJohn Marino return (0);
514*86d7f5d3SJohn Marino }
515*86d7f5d3SJohn Marino
516*86d7f5d3SJohn Marino /*
517*86d7f5d3SJohn Marino * Handles session close notification by calling close_cache_mp_read_session
518*86d7f5d3SJohn Marino * function.
519*86d7f5d3SJohn Marino */
520*86d7f5d3SJohn Marino static int
on_mp_read_session_close_notification(struct query_state * qstate)521*86d7f5d3SJohn Marino on_mp_read_session_close_notification(struct query_state *qstate)
522*86d7f5d3SJohn Marino {
523*86d7f5d3SJohn Marino
524*86d7f5d3SJohn Marino TRACE_IN(on_mp_read_session_close_notification);
525*86d7f5d3SJohn Marino configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
526*86d7f5d3SJohn Marino close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
527*86d7f5d3SJohn Marino configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
528*86d7f5d3SJohn Marino qstate->mdata = NULL;
529*86d7f5d3SJohn Marino qstate->kevent_watermark = 0;
530*86d7f5d3SJohn Marino qstate->process_func = NULL;
531*86d7f5d3SJohn Marino TRACE_OUT(on_mp_read_session_close_notification);
532*86d7f5d3SJohn Marino return (0);
533*86d7f5d3SJohn Marino }
534