xref: /freebsd-src/usr.sbin/nscd/mp_rs_query.c (revision 4d65a7c6951cea0333f1a0c1b32c38489cdfa6c5)
106a99fe3SHajimu UMEMOTO /*-
206a99fe3SHajimu UMEMOTO  * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
306a99fe3SHajimu UMEMOTO  * All rights reserved.
406a99fe3SHajimu UMEMOTO  *
506a99fe3SHajimu UMEMOTO  * Redistribution and use in source and binary forms, with or without
606a99fe3SHajimu UMEMOTO  * modification, are permitted provided that the following conditions
706a99fe3SHajimu UMEMOTO  * are met:
806a99fe3SHajimu UMEMOTO  * 1. Redistributions of source code must retain the above copyright
906a99fe3SHajimu UMEMOTO  *    notice, this list of conditions and the following disclaimer.
1006a99fe3SHajimu UMEMOTO  * 2. Redistributions in binary form must reproduce the above copyright
1106a99fe3SHajimu UMEMOTO  *    notice, this list of conditions and the following disclaimer in the
1206a99fe3SHajimu UMEMOTO  *    documentation and/or other materials provided with the distribution.
1306a99fe3SHajimu UMEMOTO  *
1406a99fe3SHajimu UMEMOTO  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1506a99fe3SHajimu UMEMOTO  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1606a99fe3SHajimu UMEMOTO  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1706a99fe3SHajimu UMEMOTO  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
1806a99fe3SHajimu UMEMOTO  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1906a99fe3SHajimu UMEMOTO  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2006a99fe3SHajimu UMEMOTO  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2106a99fe3SHajimu UMEMOTO  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2206a99fe3SHajimu UMEMOTO  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2306a99fe3SHajimu UMEMOTO  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2406a99fe3SHajimu UMEMOTO  * SUCH DAMAGE.
2506a99fe3SHajimu UMEMOTO  *
2606a99fe3SHajimu UMEMOTO  */
2706a99fe3SHajimu UMEMOTO 
2806a99fe3SHajimu UMEMOTO #include <sys/types.h>
2906a99fe3SHajimu UMEMOTO #include <sys/event.h>
3028f805ceSDag-Erling Smørgrav #include <sys/socket.h>
3128f805ceSDag-Erling Smørgrav #include <sys/time.h>
3228f805ceSDag-Erling Smørgrav 
3306a99fe3SHajimu UMEMOTO #include <assert.h>
3406a99fe3SHajimu UMEMOTO #include <errno.h>
3528f805ceSDag-Erling Smørgrav #include <nsswitch.h>
3628f805ceSDag-Erling Smørgrav #include <stdio.h>
3706a99fe3SHajimu UMEMOTO #include <stdlib.h>
3806a99fe3SHajimu UMEMOTO #include <string.h>
3906a99fe3SHajimu UMEMOTO 
4006a99fe3SHajimu UMEMOTO #include "cachelib.h"
4106a99fe3SHajimu UMEMOTO #include "config.h"
4206a99fe3SHajimu UMEMOTO #include "debug.h"
4306a99fe3SHajimu UMEMOTO #include "log.h"
4406a99fe3SHajimu UMEMOTO #include "query.h"
4506a99fe3SHajimu UMEMOTO #include "mp_rs_query.h"
4606a99fe3SHajimu UMEMOTO #include "mp_ws_query.h"
4706a99fe3SHajimu UMEMOTO #include "singletons.h"
4806a99fe3SHajimu UMEMOTO 
4906a99fe3SHajimu UMEMOTO static int on_mp_read_session_close_notification(struct query_state *);
5006a99fe3SHajimu UMEMOTO static void on_mp_read_session_destroy(struct query_state *);
5106a99fe3SHajimu UMEMOTO static int on_mp_read_session_mapper(struct query_state *);
5206a99fe3SHajimu UMEMOTO /* int on_mp_read_session_request_read1(struct query_state *); */
5306a99fe3SHajimu UMEMOTO static int on_mp_read_session_request_read2(struct query_state *);
5406a99fe3SHajimu UMEMOTO static int on_mp_read_session_request_process(struct query_state *);
5506a99fe3SHajimu UMEMOTO static int on_mp_read_session_response_write1(struct query_state *);
5606a99fe3SHajimu UMEMOTO static int on_mp_read_session_read_request_process(struct query_state *);
5706a99fe3SHajimu UMEMOTO static int on_mp_read_session_read_response_write1(struct query_state *);
5806a99fe3SHajimu UMEMOTO static int on_mp_read_session_read_response_write2(struct query_state *);
5906a99fe3SHajimu UMEMOTO 
6006a99fe3SHajimu UMEMOTO /*
6106a99fe3SHajimu UMEMOTO  * This function is used as the query_state's destroy_func to make the
6206a99fe3SHajimu UMEMOTO  * proper cleanup in case of errors.
6306a99fe3SHajimu UMEMOTO  */
6406a99fe3SHajimu UMEMOTO static void
on_mp_read_session_destroy(struct query_state * qstate)6506a99fe3SHajimu UMEMOTO on_mp_read_session_destroy(struct query_state *qstate)
6606a99fe3SHajimu UMEMOTO {
6706a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_destroy);
6806a99fe3SHajimu UMEMOTO 	finalize_comm_element(&qstate->request);
6906a99fe3SHajimu UMEMOTO 	finalize_comm_element(&qstate->response);
7006a99fe3SHajimu UMEMOTO 
7106a99fe3SHajimu UMEMOTO 	if (qstate->mdata != NULL) {
7206a99fe3SHajimu UMEMOTO 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
7306a99fe3SHajimu UMEMOTO 		close_cache_mp_read_session(
7406a99fe3SHajimu UMEMOTO 	    		(cache_mp_read_session)qstate->mdata);
7506a99fe3SHajimu UMEMOTO 		configuration_unlock_entry(qstate->config_entry,
7606a99fe3SHajimu UMEMOTO 			CELT_MULTIPART);
7706a99fe3SHajimu UMEMOTO 	}
7806a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_destroy);
7906a99fe3SHajimu UMEMOTO }
8006a99fe3SHajimu UMEMOTO 
8106a99fe3SHajimu UMEMOTO /*
8206a99fe3SHajimu UMEMOTO  * The functions below are used to process multipart read session initiation
8306a99fe3SHajimu UMEMOTO  * requests.
8406a99fe3SHajimu UMEMOTO  * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
8506a99fe3SHajimu UMEMOTO  *   the request itself
8606a99fe3SHajimu UMEMOTO  * - on_mp_read_session_request_process processes it
8706a99fe3SHajimu UMEMOTO  * - on_mp_read_session_response_write1 sends the response
8806a99fe3SHajimu UMEMOTO  */
8906a99fe3SHajimu UMEMOTO int
on_mp_read_session_request_read1(struct query_state * qstate)9006a99fe3SHajimu UMEMOTO on_mp_read_session_request_read1(struct query_state *qstate)
9106a99fe3SHajimu UMEMOTO {
9206a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_request	*c_mp_rs_request;
9306a99fe3SHajimu UMEMOTO 	ssize_t	result;
9406a99fe3SHajimu UMEMOTO 
9506a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_request_read1);
9606a99fe3SHajimu UMEMOTO 	if (qstate->kevent_watermark == 0)
9706a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = sizeof(size_t);
9806a99fe3SHajimu UMEMOTO 	else {
9906a99fe3SHajimu UMEMOTO 		init_comm_element(&qstate->request,
10006a99fe3SHajimu UMEMOTO 	    		CET_MP_READ_SESSION_REQUEST);
10106a99fe3SHajimu UMEMOTO 		c_mp_rs_request = get_cache_mp_read_session_request(
10206a99fe3SHajimu UMEMOTO 	    		&qstate->request);
10306a99fe3SHajimu UMEMOTO 
10406a99fe3SHajimu UMEMOTO 		result = qstate->read_func(qstate,
10506a99fe3SHajimu UMEMOTO 	    		&c_mp_rs_request->entry_length, sizeof(size_t));
10606a99fe3SHajimu UMEMOTO 
10706a99fe3SHajimu UMEMOTO 		if (result != sizeof(size_t)) {
10806a99fe3SHajimu UMEMOTO 			TRACE_OUT(on_mp_read_session_request_read1);
10906a99fe3SHajimu UMEMOTO 			return (-1);
11006a99fe3SHajimu UMEMOTO 		}
11106a99fe3SHajimu UMEMOTO 
11206a99fe3SHajimu UMEMOTO 		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
11306a99fe3SHajimu UMEMOTO 			TRACE_OUT(on_mp_read_session_request_read1);
11406a99fe3SHajimu UMEMOTO 			return (-1);
11506a99fe3SHajimu UMEMOTO 		}
11606a99fe3SHajimu UMEMOTO 
117*8eeaaffaSDag-Erling Smørgrav 		c_mp_rs_request->entry = calloc(1,
11806a99fe3SHajimu UMEMOTO 			c_mp_rs_request->entry_length + 1);
11906a99fe3SHajimu UMEMOTO 		assert(c_mp_rs_request->entry != NULL);
12006a99fe3SHajimu UMEMOTO 
12106a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = c_mp_rs_request->entry_length;
12206a99fe3SHajimu UMEMOTO 		qstate->process_func = on_mp_read_session_request_read2;
12306a99fe3SHajimu UMEMOTO 	}
12406a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_request_read1);
12506a99fe3SHajimu UMEMOTO 	return (0);
12606a99fe3SHajimu UMEMOTO }
12706a99fe3SHajimu UMEMOTO 
12806a99fe3SHajimu UMEMOTO static int
on_mp_read_session_request_read2(struct query_state * qstate)12906a99fe3SHajimu UMEMOTO on_mp_read_session_request_read2(struct query_state *qstate)
13006a99fe3SHajimu UMEMOTO {
13106a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_request	*c_mp_rs_request;
13206a99fe3SHajimu UMEMOTO 	ssize_t	result;
13306a99fe3SHajimu UMEMOTO 
13406a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_request_read2);
13506a99fe3SHajimu UMEMOTO 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
13606a99fe3SHajimu UMEMOTO 
13706a99fe3SHajimu UMEMOTO 	result = qstate->read_func(qstate, c_mp_rs_request->entry,
13806a99fe3SHajimu UMEMOTO 		c_mp_rs_request->entry_length);
13906a99fe3SHajimu UMEMOTO 
14051d6ddb5SDag-Erling Smørgrav 	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
14106a99fe3SHajimu UMEMOTO 		LOG_ERR_3("on_mp_read_session_request_read2",
14206a99fe3SHajimu UMEMOTO 			"read failed");
14306a99fe3SHajimu UMEMOTO 		TRACE_OUT(on_mp_read_session_request_read2);
14406a99fe3SHajimu UMEMOTO 		return (-1);
14506a99fe3SHajimu UMEMOTO 	}
14606a99fe3SHajimu UMEMOTO 
14706a99fe3SHajimu UMEMOTO 	qstate->kevent_watermark = 0;
14806a99fe3SHajimu UMEMOTO 	qstate->process_func = on_mp_read_session_request_process;
14906a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_request_read2);
15006a99fe3SHajimu UMEMOTO 	return (0);
15106a99fe3SHajimu UMEMOTO }
15206a99fe3SHajimu UMEMOTO 
15306a99fe3SHajimu UMEMOTO static int
on_mp_read_session_request_process(struct query_state * qstate)15406a99fe3SHajimu UMEMOTO on_mp_read_session_request_process(struct query_state *qstate)
15506a99fe3SHajimu UMEMOTO {
15606a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_request	*c_mp_rs_request;
15706a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_response	*c_mp_rs_response;
15806a99fe3SHajimu UMEMOTO 	cache_mp_read_session	rs;
15906a99fe3SHajimu UMEMOTO 	cache_entry	c_entry;
16006a99fe3SHajimu UMEMOTO 	char	*dec_cache_entry_name;
16106a99fe3SHajimu UMEMOTO 
16206a99fe3SHajimu UMEMOTO 	char *buffer;
16306a99fe3SHajimu UMEMOTO 	size_t buffer_size;
16406a99fe3SHajimu UMEMOTO 	cache_mp_write_session ws;
16506a99fe3SHajimu UMEMOTO 	struct agent	*lookup_agent;
16606a99fe3SHajimu UMEMOTO 	struct multipart_agent *mp_agent;
16706a99fe3SHajimu UMEMOTO 	void *mdata;
16806a99fe3SHajimu UMEMOTO 	int res;
16906a99fe3SHajimu UMEMOTO 
17006a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_request_process);
17106a99fe3SHajimu UMEMOTO 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
17206a99fe3SHajimu UMEMOTO 	c_mp_rs_response = get_cache_mp_read_session_response(
17306a99fe3SHajimu UMEMOTO 		&qstate->response);
17406a99fe3SHajimu UMEMOTO 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
17506a99fe3SHajimu UMEMOTO 
17606a99fe3SHajimu UMEMOTO 	qstate->config_entry = configuration_find_entry(
17706a99fe3SHajimu UMEMOTO 		s_configuration, c_mp_rs_request->entry);
17806a99fe3SHajimu UMEMOTO 	if (qstate->config_entry == NULL) {
17906a99fe3SHajimu UMEMOTO 		c_mp_rs_response->error_code = ENOENT;
18006a99fe3SHajimu UMEMOTO 
18106a99fe3SHajimu UMEMOTO 		LOG_ERR_2("read_session_request",
18206a99fe3SHajimu UMEMOTO 			"can't find configuration entry '%s'."
18306a99fe3SHajimu UMEMOTO 			" aborting request", c_mp_rs_request->entry);
18406a99fe3SHajimu UMEMOTO 		goto fin;
18506a99fe3SHajimu UMEMOTO 	}
18606a99fe3SHajimu UMEMOTO 
18706a99fe3SHajimu UMEMOTO 	if (qstate->config_entry->enabled == 0) {
18806a99fe3SHajimu UMEMOTO 		c_mp_rs_response->error_code = EACCES;
18906a99fe3SHajimu UMEMOTO 
19006a99fe3SHajimu UMEMOTO 		LOG_ERR_2("read_session_request",
19106a99fe3SHajimu UMEMOTO 			"configuration entry '%s' is disabled",
19206a99fe3SHajimu UMEMOTO 			c_mp_rs_request->entry);
19306a99fe3SHajimu UMEMOTO 		goto fin;
19406a99fe3SHajimu UMEMOTO 	}
19506a99fe3SHajimu UMEMOTO 
19606a99fe3SHajimu UMEMOTO 	if (qstate->config_entry->perform_actual_lookups != 0)
19706a99fe3SHajimu UMEMOTO 		dec_cache_entry_name = strdup(
19827f2bc9eSDag-Erling Smørgrav 			qstate->config_entry->mp_cache_params.cep.entry_name);
19906a99fe3SHajimu UMEMOTO 	else {
200db1bdf2bSMichael Bushkov #ifdef NS_NSCD_EID_CHECKING
20106a99fe3SHajimu UMEMOTO 		if (check_query_eids(qstate) != 0) {
20206a99fe3SHajimu UMEMOTO 			c_mp_rs_response->error_code = EPERM;
20306a99fe3SHajimu UMEMOTO 			goto fin;
20406a99fe3SHajimu UMEMOTO 		}
20506a99fe3SHajimu UMEMOTO #endif
20606a99fe3SHajimu UMEMOTO 
20706a99fe3SHajimu UMEMOTO 		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
20827f2bc9eSDag-Erling Smørgrav 			qstate->config_entry->mp_cache_params.cep.entry_name);
20906a99fe3SHajimu UMEMOTO 	}
21006a99fe3SHajimu UMEMOTO 
21106a99fe3SHajimu UMEMOTO 	assert(dec_cache_entry_name != NULL);
21206a99fe3SHajimu UMEMOTO 
21306a99fe3SHajimu UMEMOTO 	configuration_lock_rdlock(s_configuration);
21406a99fe3SHajimu UMEMOTO 	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
21506a99fe3SHajimu UMEMOTO 	configuration_unlock(s_configuration);
21606a99fe3SHajimu UMEMOTO 
21706a99fe3SHajimu UMEMOTO 	if ((c_entry == INVALID_CACHE) &&
21806a99fe3SHajimu UMEMOTO 	   (qstate->config_entry->perform_actual_lookups != 0))
21906a99fe3SHajimu UMEMOTO 		c_entry = register_new_mp_cache_entry(qstate,
22006a99fe3SHajimu UMEMOTO 			dec_cache_entry_name);
22106a99fe3SHajimu UMEMOTO 
22206a99fe3SHajimu UMEMOTO 	free(dec_cache_entry_name);
22306a99fe3SHajimu UMEMOTO 
22406a99fe3SHajimu UMEMOTO 	if (c_entry != INVALID_CACHE_ENTRY) {
22506a99fe3SHajimu UMEMOTO 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
22606a99fe3SHajimu UMEMOTO 		rs = open_cache_mp_read_session(c_entry);
22706a99fe3SHajimu UMEMOTO 		configuration_unlock_entry(qstate->config_entry,
22806a99fe3SHajimu UMEMOTO 			CELT_MULTIPART);
22906a99fe3SHajimu UMEMOTO 
23006a99fe3SHajimu UMEMOTO 		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
23106a99fe3SHajimu UMEMOTO 		   (qstate->config_entry->perform_actual_lookups != 0)) {
23206a99fe3SHajimu UMEMOTO 			lookup_agent = find_agent(s_agent_table,
23306a99fe3SHajimu UMEMOTO 				c_mp_rs_request->entry, MULTIPART_AGENT);
23406a99fe3SHajimu UMEMOTO 
23506a99fe3SHajimu UMEMOTO 			if ((lookup_agent != NULL) &&
23606a99fe3SHajimu UMEMOTO 			(lookup_agent->type == MULTIPART_AGENT)) {
23706a99fe3SHajimu UMEMOTO 				mp_agent = (struct multipart_agent *)
23806a99fe3SHajimu UMEMOTO 					lookup_agent;
23906a99fe3SHajimu UMEMOTO 				mdata = mp_agent->mp_init_func();
24006a99fe3SHajimu UMEMOTO 
24106a99fe3SHajimu UMEMOTO 				/*
24206a99fe3SHajimu UMEMOTO 				 * Multipart agents read the whole snapshot
24306a99fe3SHajimu UMEMOTO 				 * of the data at one time.
24406a99fe3SHajimu UMEMOTO 				 */
24506a99fe3SHajimu UMEMOTO 				configuration_lock_entry(qstate->config_entry,
24606a99fe3SHajimu UMEMOTO 					CELT_MULTIPART);
24706a99fe3SHajimu UMEMOTO 				ws = open_cache_mp_write_session(c_entry);
24806a99fe3SHajimu UMEMOTO 				configuration_unlock_entry(qstate->config_entry,
24906a99fe3SHajimu UMEMOTO 					CELT_MULTIPART);
25006a99fe3SHajimu UMEMOTO 				if (ws != NULL) {
25106a99fe3SHajimu UMEMOTO 				    do {
25206a99fe3SHajimu UMEMOTO 					buffer = NULL;
25306a99fe3SHajimu UMEMOTO 					res = mp_agent->mp_lookup_func(&buffer,
25406a99fe3SHajimu UMEMOTO 						&buffer_size,
25506a99fe3SHajimu UMEMOTO 						mdata);
25606a99fe3SHajimu UMEMOTO 
25706a99fe3SHajimu UMEMOTO 					if ((res & NS_TERMINATE) &&
25806a99fe3SHajimu UMEMOTO 					   (buffer != NULL)) {
25906a99fe3SHajimu UMEMOTO 						configuration_lock_entry(
26006a99fe3SHajimu UMEMOTO 							qstate->config_entry,
26106a99fe3SHajimu UMEMOTO 						   	CELT_MULTIPART);
26206a99fe3SHajimu UMEMOTO 						if (cache_mp_write(ws, buffer,
26306a99fe3SHajimu UMEMOTO 						    buffer_size) != 0) {
26406a99fe3SHajimu UMEMOTO 							abandon_cache_mp_write_session(ws);
26506a99fe3SHajimu UMEMOTO 							ws = NULL;
26606a99fe3SHajimu UMEMOTO 						}
26706a99fe3SHajimu UMEMOTO 						configuration_unlock_entry(
26806a99fe3SHajimu UMEMOTO 							qstate->config_entry,
26906a99fe3SHajimu UMEMOTO 							CELT_MULTIPART);
27006a99fe3SHajimu UMEMOTO 
27106a99fe3SHajimu UMEMOTO 						free(buffer);
27206a99fe3SHajimu UMEMOTO 						buffer = NULL;
27306a99fe3SHajimu UMEMOTO 					} else {
27406a99fe3SHajimu UMEMOTO 						configuration_lock_entry(
27506a99fe3SHajimu UMEMOTO 							qstate->config_entry,
27606a99fe3SHajimu UMEMOTO 							CELT_MULTIPART);
27706a99fe3SHajimu UMEMOTO 						close_cache_mp_write_session(ws);
27806a99fe3SHajimu UMEMOTO 						configuration_unlock_entry(
27906a99fe3SHajimu UMEMOTO 							qstate->config_entry,
28006a99fe3SHajimu UMEMOTO 							CELT_MULTIPART);
28106a99fe3SHajimu UMEMOTO 
28206a99fe3SHajimu UMEMOTO 						free(buffer);
28306a99fe3SHajimu UMEMOTO 						buffer = NULL;
28406a99fe3SHajimu UMEMOTO 					}
28506a99fe3SHajimu UMEMOTO 				    } while ((res & NS_TERMINATE) &&
28606a99fe3SHajimu UMEMOTO 				    	    (ws != NULL));
28706a99fe3SHajimu UMEMOTO 				}
28806a99fe3SHajimu UMEMOTO 
28906a99fe3SHajimu UMEMOTO 				configuration_lock_entry(qstate->config_entry,
29006a99fe3SHajimu UMEMOTO 					CELT_MULTIPART);
29106a99fe3SHajimu UMEMOTO 				rs = open_cache_mp_read_session(c_entry);
29206a99fe3SHajimu UMEMOTO 				configuration_unlock_entry(qstate->config_entry,
29306a99fe3SHajimu UMEMOTO 					CELT_MULTIPART);
29406a99fe3SHajimu UMEMOTO 			}
29506a99fe3SHajimu UMEMOTO 		}
29606a99fe3SHajimu UMEMOTO 
29706a99fe3SHajimu UMEMOTO 		if (rs == INVALID_CACHE_MP_READ_SESSION)
29806a99fe3SHajimu UMEMOTO 			c_mp_rs_response->error_code = -1;
29906a99fe3SHajimu UMEMOTO 		else {
30006a99fe3SHajimu UMEMOTO 		    qstate->mdata = rs;
30106a99fe3SHajimu UMEMOTO 		    qstate->destroy_func = on_mp_read_session_destroy;
30206a99fe3SHajimu UMEMOTO 
30306a99fe3SHajimu UMEMOTO 		    configuration_lock_entry(qstate->config_entry,
30406a99fe3SHajimu UMEMOTO 			CELT_MULTIPART);
30506a99fe3SHajimu UMEMOTO 		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
30606a99fe3SHajimu UMEMOTO 		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
30706a99fe3SHajimu UMEMOTO 			memcpy(&qstate->timeout,
30806a99fe3SHajimu UMEMOTO 			    &qstate->config_entry->mp_query_timeout,
30906a99fe3SHajimu UMEMOTO 			    sizeof(struct timeval));
31006a99fe3SHajimu UMEMOTO 		    configuration_unlock_entry(qstate->config_entry,
31106a99fe3SHajimu UMEMOTO 			CELT_MULTIPART);
31206a99fe3SHajimu UMEMOTO 		}
31306a99fe3SHajimu UMEMOTO 	} else
31406a99fe3SHajimu UMEMOTO 		c_mp_rs_response->error_code = -1;
31506a99fe3SHajimu UMEMOTO 
31606a99fe3SHajimu UMEMOTO fin:
31706a99fe3SHajimu UMEMOTO 	qstate->process_func = on_mp_read_session_response_write1;
31806a99fe3SHajimu UMEMOTO 	qstate->kevent_watermark = sizeof(int);
31906a99fe3SHajimu UMEMOTO 	qstate->kevent_filter = EVFILT_WRITE;
32006a99fe3SHajimu UMEMOTO 
32106a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_request_process);
32206a99fe3SHajimu UMEMOTO 	return (0);
32306a99fe3SHajimu UMEMOTO }
32406a99fe3SHajimu UMEMOTO 
32506a99fe3SHajimu UMEMOTO static int
on_mp_read_session_response_write1(struct query_state * qstate)32606a99fe3SHajimu UMEMOTO on_mp_read_session_response_write1(struct query_state *qstate)
32706a99fe3SHajimu UMEMOTO {
32806a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_response	*c_mp_rs_response;
32906a99fe3SHajimu UMEMOTO 	ssize_t	result;
33006a99fe3SHajimu UMEMOTO 
33106a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_response_write1);
33206a99fe3SHajimu UMEMOTO 	c_mp_rs_response = get_cache_mp_read_session_response(
33306a99fe3SHajimu UMEMOTO 		&qstate->response);
33406a99fe3SHajimu UMEMOTO 	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
33506a99fe3SHajimu UMEMOTO 		sizeof(int));
33606a99fe3SHajimu UMEMOTO 
33706a99fe3SHajimu UMEMOTO 	if (result != sizeof(int)) {
33806a99fe3SHajimu UMEMOTO 		LOG_ERR_3("on_mp_read_session_response_write1",
33906a99fe3SHajimu UMEMOTO 			"write failed");
34006a99fe3SHajimu UMEMOTO 		TRACE_OUT(on_mp_read_session_response_write1);
34106a99fe3SHajimu UMEMOTO 		return (-1);
34206a99fe3SHajimu UMEMOTO 	}
34306a99fe3SHajimu UMEMOTO 
34406a99fe3SHajimu UMEMOTO 	if (c_mp_rs_response->error_code == 0) {
34506a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = sizeof(int);
34606a99fe3SHajimu UMEMOTO 		qstate->process_func = on_mp_read_session_mapper;
34706a99fe3SHajimu UMEMOTO 		qstate->kevent_filter = EVFILT_READ;
34806a99fe3SHajimu UMEMOTO 	} else {
34906a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = 0;
35006a99fe3SHajimu UMEMOTO 		qstate->process_func = NULL;
35106a99fe3SHajimu UMEMOTO 	}
35206a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_response_write1);
35306a99fe3SHajimu UMEMOTO 	return (0);
35406a99fe3SHajimu UMEMOTO }
35506a99fe3SHajimu UMEMOTO 
35606a99fe3SHajimu UMEMOTO /*
35706a99fe3SHajimu UMEMOTO  * Mapper function is used to avoid multiple connections for each session
35806a99fe3SHajimu UMEMOTO  * write or read requests. After processing the request, it does not close
35906a99fe3SHajimu UMEMOTO  * the connection, but waits for the next request.
36006a99fe3SHajimu UMEMOTO  */
36106a99fe3SHajimu UMEMOTO static int
on_mp_read_session_mapper(struct query_state * qstate)36206a99fe3SHajimu UMEMOTO on_mp_read_session_mapper(struct query_state *qstate)
36306a99fe3SHajimu UMEMOTO {
36406a99fe3SHajimu UMEMOTO 	ssize_t	result;
36506a99fe3SHajimu UMEMOTO 	int elem_type;
36606a99fe3SHajimu UMEMOTO 
36706a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_mapper);
36806a99fe3SHajimu UMEMOTO 	if (qstate->kevent_watermark == 0) {
36906a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = sizeof(int);
37006a99fe3SHajimu UMEMOTO 	} else {
37106a99fe3SHajimu UMEMOTO 		result = qstate->read_func(qstate, &elem_type, sizeof(int));
37206a99fe3SHajimu UMEMOTO 		if (result != sizeof(int)) {
37306a99fe3SHajimu UMEMOTO 			LOG_ERR_3("on_mp_read_session_mapper",
37406a99fe3SHajimu UMEMOTO 				"read failed");
37506a99fe3SHajimu UMEMOTO 			TRACE_OUT(on_mp_read_session_mapper);
37606a99fe3SHajimu UMEMOTO 			return (-1);
37706a99fe3SHajimu UMEMOTO 		}
37806a99fe3SHajimu UMEMOTO 
37906a99fe3SHajimu UMEMOTO 		switch (elem_type) {
38006a99fe3SHajimu UMEMOTO 		case CET_MP_READ_SESSION_READ_REQUEST:
38106a99fe3SHajimu UMEMOTO 			qstate->kevent_watermark = 0;
38206a99fe3SHajimu UMEMOTO 			qstate->process_func =
38306a99fe3SHajimu UMEMOTO 				on_mp_read_session_read_request_process;
38406a99fe3SHajimu UMEMOTO 			break;
38506a99fe3SHajimu UMEMOTO 		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
38606a99fe3SHajimu UMEMOTO 			qstate->kevent_watermark = 0;
38706a99fe3SHajimu UMEMOTO 			qstate->process_func =
38806a99fe3SHajimu UMEMOTO 				on_mp_read_session_close_notification;
38906a99fe3SHajimu UMEMOTO 			break;
39006a99fe3SHajimu UMEMOTO 		default:
39106a99fe3SHajimu UMEMOTO 			qstate->kevent_watermark = 0;
39206a99fe3SHajimu UMEMOTO 			qstate->process_func = NULL;
39306a99fe3SHajimu UMEMOTO 			LOG_ERR_3("on_mp_read_session_mapper",
39406a99fe3SHajimu UMEMOTO 				"unknown element type");
39506a99fe3SHajimu UMEMOTO 			TRACE_OUT(on_mp_read_session_mapper);
39606a99fe3SHajimu UMEMOTO 			return (-1);
39706a99fe3SHajimu UMEMOTO 		}
39806a99fe3SHajimu UMEMOTO 	}
39906a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_mapper);
40006a99fe3SHajimu UMEMOTO 	return (0);
40106a99fe3SHajimu UMEMOTO }
40206a99fe3SHajimu UMEMOTO 
40306a99fe3SHajimu UMEMOTO /*
40406a99fe3SHajimu UMEMOTO  * The functions below are used to process multipart read sessions read
40506a99fe3SHajimu UMEMOTO  * requests. User doesn't have to pass any kind of data, besides the
40606a99fe3SHajimu UMEMOTO  * request identificator itself. So we don't need any XXX_read functions and
40706a99fe3SHajimu UMEMOTO  * start with the XXX_process function.
40806a99fe3SHajimu UMEMOTO  * - on_mp_read_session_read_request_process processes it
40906a99fe3SHajimu UMEMOTO  * - on_mp_read_session_read_response_write1 and
41006a99fe3SHajimu UMEMOTO  *   on_mp_read_session_read_response_write2 sends the response
41106a99fe3SHajimu UMEMOTO  */
41206a99fe3SHajimu UMEMOTO static int
on_mp_read_session_read_request_process(struct query_state * qstate)41306a99fe3SHajimu UMEMOTO on_mp_read_session_read_request_process(struct query_state *qstate)
41406a99fe3SHajimu UMEMOTO {
41506a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_read_response	*read_response;
41606a99fe3SHajimu UMEMOTO 
41706a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_response_process);
41806a99fe3SHajimu UMEMOTO 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
41906a99fe3SHajimu UMEMOTO 	read_response = get_cache_mp_read_session_read_response(
42006a99fe3SHajimu UMEMOTO 		&qstate->response);
42106a99fe3SHajimu UMEMOTO 
42206a99fe3SHajimu UMEMOTO 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
42306a99fe3SHajimu UMEMOTO 	read_response->error_code = cache_mp_read(
42406a99fe3SHajimu UMEMOTO 		(cache_mp_read_session)qstate->mdata, NULL,
42506a99fe3SHajimu UMEMOTO 		&read_response->data_size);
42606a99fe3SHajimu UMEMOTO 
42706a99fe3SHajimu UMEMOTO 	if (read_response->error_code == 0) {
428*8eeaaffaSDag-Erling Smørgrav 		read_response->data = malloc(read_response->data_size);
42906a99fe3SHajimu UMEMOTO 		assert(read_response != NULL);
43006a99fe3SHajimu UMEMOTO 		read_response->error_code = cache_mp_read(
43106a99fe3SHajimu UMEMOTO 			(cache_mp_read_session)qstate->mdata,
43206a99fe3SHajimu UMEMOTO 	    		read_response->data,
43306a99fe3SHajimu UMEMOTO 			&read_response->data_size);
43406a99fe3SHajimu UMEMOTO 	}
43506a99fe3SHajimu UMEMOTO 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
43606a99fe3SHajimu UMEMOTO 
43706a99fe3SHajimu UMEMOTO 	if (read_response->error_code == 0)
43806a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
43906a99fe3SHajimu UMEMOTO 	else
44006a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = sizeof(int);
44106a99fe3SHajimu UMEMOTO 	qstate->process_func = on_mp_read_session_read_response_write1;
44206a99fe3SHajimu UMEMOTO 	qstate->kevent_filter = EVFILT_WRITE;
44306a99fe3SHajimu UMEMOTO 
44406a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_response_process);
44506a99fe3SHajimu UMEMOTO 	return (0);
44606a99fe3SHajimu UMEMOTO }
44706a99fe3SHajimu UMEMOTO 
44806a99fe3SHajimu UMEMOTO static int
on_mp_read_session_read_response_write1(struct query_state * qstate)44906a99fe3SHajimu UMEMOTO on_mp_read_session_read_response_write1(struct query_state *qstate)
45006a99fe3SHajimu UMEMOTO {
45106a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_read_response	*read_response;
45206a99fe3SHajimu UMEMOTO 	ssize_t	result;
45306a99fe3SHajimu UMEMOTO 
45406a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_read_response_write1);
45506a99fe3SHajimu UMEMOTO 	read_response = get_cache_mp_read_session_read_response(
45606a99fe3SHajimu UMEMOTO 		&qstate->response);
45706a99fe3SHajimu UMEMOTO 
45806a99fe3SHajimu UMEMOTO 	result = qstate->write_func(qstate, &read_response->error_code,
45906a99fe3SHajimu UMEMOTO 		sizeof(int));
46006a99fe3SHajimu UMEMOTO 	if (read_response->error_code == 0) {
46106a99fe3SHajimu UMEMOTO 		result += qstate->write_func(qstate, &read_response->data_size,
46206a99fe3SHajimu UMEMOTO 			sizeof(size_t));
46351d6ddb5SDag-Erling Smørgrav 		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
46406a99fe3SHajimu UMEMOTO 			TRACE_OUT(on_mp_read_session_read_response_write1);
46506a99fe3SHajimu UMEMOTO 			LOG_ERR_3("on_mp_read_session_read_response_write1",
46606a99fe3SHajimu UMEMOTO 				"write failed");
46706a99fe3SHajimu UMEMOTO 			return (-1);
46806a99fe3SHajimu UMEMOTO 		}
46906a99fe3SHajimu UMEMOTO 
47006a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = read_response->data_size;
47106a99fe3SHajimu UMEMOTO 		qstate->process_func = on_mp_read_session_read_response_write2;
47206a99fe3SHajimu UMEMOTO 	} else {
47351d6ddb5SDag-Erling Smørgrav 		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
47406a99fe3SHajimu UMEMOTO 			LOG_ERR_3("on_mp_read_session_read_response_write1",
47506a99fe3SHajimu UMEMOTO 				"write failed");
47606a99fe3SHajimu UMEMOTO 			TRACE_OUT(on_mp_read_session_read_response_write1);
47706a99fe3SHajimu UMEMOTO 			return (-1);
47806a99fe3SHajimu UMEMOTO 		}
47906a99fe3SHajimu UMEMOTO 
48006a99fe3SHajimu UMEMOTO 		qstate->kevent_watermark = 0;
48106a99fe3SHajimu UMEMOTO 		qstate->process_func = NULL;
48206a99fe3SHajimu UMEMOTO 	}
48306a99fe3SHajimu UMEMOTO 
48406a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_read_response_write1);
48506a99fe3SHajimu UMEMOTO 	return (0);
48606a99fe3SHajimu UMEMOTO }
48706a99fe3SHajimu UMEMOTO 
48806a99fe3SHajimu UMEMOTO static int
on_mp_read_session_read_response_write2(struct query_state * qstate)48906a99fe3SHajimu UMEMOTO on_mp_read_session_read_response_write2(struct query_state *qstate)
49006a99fe3SHajimu UMEMOTO {
49106a99fe3SHajimu UMEMOTO 	struct cache_mp_read_session_read_response *read_response;
49206a99fe3SHajimu UMEMOTO 	ssize_t	result;
49306a99fe3SHajimu UMEMOTO 
49406a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_read_response_write2);
49506a99fe3SHajimu UMEMOTO 	read_response = get_cache_mp_read_session_read_response(
49606a99fe3SHajimu UMEMOTO 		&qstate->response);
49706a99fe3SHajimu UMEMOTO 	result = qstate->write_func(qstate, read_response->data,
49806a99fe3SHajimu UMEMOTO 		read_response->data_size);
49951d6ddb5SDag-Erling Smørgrav 	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
50006a99fe3SHajimu UMEMOTO 		LOG_ERR_3("on_mp_read_session_read_response_write2",
50106a99fe3SHajimu UMEMOTO 			"write failed");
50206a99fe3SHajimu UMEMOTO 		TRACE_OUT(on_mp_read_session_read_response_write2);
50306a99fe3SHajimu UMEMOTO 		return (-1);
50406a99fe3SHajimu UMEMOTO 	}
50506a99fe3SHajimu UMEMOTO 
50606a99fe3SHajimu UMEMOTO 	finalize_comm_element(&qstate->request);
50706a99fe3SHajimu UMEMOTO 	finalize_comm_element(&qstate->response);
50806a99fe3SHajimu UMEMOTO 
50906a99fe3SHajimu UMEMOTO 	qstate->kevent_watermark = sizeof(int);
51006a99fe3SHajimu UMEMOTO 	qstate->process_func = on_mp_read_session_mapper;
51106a99fe3SHajimu UMEMOTO 	qstate->kevent_filter = EVFILT_READ;
51206a99fe3SHajimu UMEMOTO 
51306a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_read_response_write2);
51406a99fe3SHajimu UMEMOTO 	return (0);
51506a99fe3SHajimu UMEMOTO }
51606a99fe3SHajimu UMEMOTO 
51706a99fe3SHajimu UMEMOTO /*
51806a99fe3SHajimu UMEMOTO  * Handles session close notification by calling close_cache_mp_read_session
51906a99fe3SHajimu UMEMOTO  * function.
52006a99fe3SHajimu UMEMOTO  */
52106a99fe3SHajimu UMEMOTO static int
on_mp_read_session_close_notification(struct query_state * qstate)52206a99fe3SHajimu UMEMOTO on_mp_read_session_close_notification(struct query_state *qstate)
52306a99fe3SHajimu UMEMOTO {
52406a99fe3SHajimu UMEMOTO 
52506a99fe3SHajimu UMEMOTO 	TRACE_IN(on_mp_read_session_close_notification);
52606a99fe3SHajimu UMEMOTO 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
52706a99fe3SHajimu UMEMOTO 	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
52806a99fe3SHajimu UMEMOTO 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
52906a99fe3SHajimu UMEMOTO 	qstate->mdata = NULL;
53006a99fe3SHajimu UMEMOTO 	qstate->kevent_watermark = 0;
53106a99fe3SHajimu UMEMOTO 	qstate->process_func = NULL;
53206a99fe3SHajimu UMEMOTO 	TRACE_OUT(on_mp_read_session_close_notification);
53306a99fe3SHajimu UMEMOTO 	return (0);
53406a99fe3SHajimu UMEMOTO }
535