106a99fe3SHajimu UMEMOTO /*-
206a99fe3SHajimu UMEMOTO * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
306a99fe3SHajimu UMEMOTO * All rights reserved.
406a99fe3SHajimu UMEMOTO *
506a99fe3SHajimu UMEMOTO * Redistribution and use in source and binary forms, with or without
606a99fe3SHajimu UMEMOTO * modification, are permitted provided that the following conditions
706a99fe3SHajimu UMEMOTO * are met:
806a99fe3SHajimu UMEMOTO * 1. Redistributions of source code must retain the above copyright
906a99fe3SHajimu UMEMOTO * notice, this list of conditions and the following disclaimer.
1006a99fe3SHajimu UMEMOTO * 2. Redistributions in binary form must reproduce the above copyright
1106a99fe3SHajimu UMEMOTO * notice, this list of conditions and the following disclaimer in the
1206a99fe3SHajimu UMEMOTO * documentation and/or other materials provided with the distribution.
1306a99fe3SHajimu UMEMOTO *
1406a99fe3SHajimu UMEMOTO * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1506a99fe3SHajimu UMEMOTO * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1606a99fe3SHajimu UMEMOTO * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1706a99fe3SHajimu UMEMOTO * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
1806a99fe3SHajimu UMEMOTO * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1906a99fe3SHajimu UMEMOTO * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2006a99fe3SHajimu UMEMOTO * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2106a99fe3SHajimu UMEMOTO * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2206a99fe3SHajimu UMEMOTO * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2306a99fe3SHajimu UMEMOTO * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2406a99fe3SHajimu UMEMOTO * SUCH DAMAGE.
2506a99fe3SHajimu UMEMOTO *
2606a99fe3SHajimu UMEMOTO */
2706a99fe3SHajimu UMEMOTO
2806a99fe3SHajimu UMEMOTO #include <sys/types.h>
2906a99fe3SHajimu UMEMOTO #include <sys/event.h>
30a5a5d924SDag-Erling Smørgrav #include <sys/socket.h>
31a5a5d924SDag-Erling Smørgrav #include <sys/time.h>
32a5a5d924SDag-Erling Smørgrav
3306a99fe3SHajimu UMEMOTO #include <assert.h>
3406a99fe3SHajimu UMEMOTO #include <errno.h>
35a5a5d924SDag-Erling Smørgrav #include <stdio.h>
3606a99fe3SHajimu UMEMOTO #include <stdlib.h>
3706a99fe3SHajimu UMEMOTO #include <string.h>
3806a99fe3SHajimu UMEMOTO
3906a99fe3SHajimu UMEMOTO #include "cachelib.h"
4006a99fe3SHajimu UMEMOTO #include "config.h"
4106a99fe3SHajimu UMEMOTO #include "debug.h"
4206a99fe3SHajimu UMEMOTO #include "log.h"
4306a99fe3SHajimu UMEMOTO #include "query.h"
4406a99fe3SHajimu UMEMOTO #include "mp_ws_query.h"
4506a99fe3SHajimu UMEMOTO #include "singletons.h"
4606a99fe3SHajimu UMEMOTO
4706a99fe3SHajimu UMEMOTO static int on_mp_write_session_abandon_notification(struct query_state *);
4806a99fe3SHajimu UMEMOTO static int on_mp_write_session_close_notification(struct query_state *);
4906a99fe3SHajimu UMEMOTO static void on_mp_write_session_destroy(struct query_state *);
5006a99fe3SHajimu UMEMOTO static int on_mp_write_session_mapper(struct query_state *);
5106a99fe3SHajimu UMEMOTO /* int on_mp_write_session_request_read1(struct query_state *); */
5206a99fe3SHajimu UMEMOTO static int on_mp_write_session_request_read2(struct query_state *);
5306a99fe3SHajimu UMEMOTO static int on_mp_write_session_request_process(struct query_state *);
5406a99fe3SHajimu UMEMOTO static int on_mp_write_session_response_write1(struct query_state *);
5506a99fe3SHajimu UMEMOTO static int on_mp_write_session_write_request_read1(struct query_state *);
5606a99fe3SHajimu UMEMOTO static int on_mp_write_session_write_request_read2(struct query_state *);
5706a99fe3SHajimu UMEMOTO static int on_mp_write_session_write_request_process(struct query_state *);
5806a99fe3SHajimu UMEMOTO static int on_mp_write_session_write_response_write1(struct query_state *);
5906a99fe3SHajimu UMEMOTO
6006a99fe3SHajimu UMEMOTO /*
6106a99fe3SHajimu UMEMOTO * This function is used as the query_state's destroy_func to make the
6206a99fe3SHajimu UMEMOTO * proper cleanup in case of errors.
6306a99fe3SHajimu UMEMOTO */
6406a99fe3SHajimu UMEMOTO static void
on_mp_write_session_destroy(struct query_state * qstate)6506a99fe3SHajimu UMEMOTO on_mp_write_session_destroy(struct query_state *qstate)
6606a99fe3SHajimu UMEMOTO {
6706a99fe3SHajimu UMEMOTO
6806a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_destroy);
6906a99fe3SHajimu UMEMOTO finalize_comm_element(&qstate->request);
7006a99fe3SHajimu UMEMOTO finalize_comm_element(&qstate->response);
7106a99fe3SHajimu UMEMOTO
7206a99fe3SHajimu UMEMOTO if (qstate->mdata != NULL) {
7306a99fe3SHajimu UMEMOTO configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
7406a99fe3SHajimu UMEMOTO abandon_cache_mp_write_session(
7506a99fe3SHajimu UMEMOTO (cache_mp_write_session)qstate->mdata);
7606a99fe3SHajimu UMEMOTO configuration_unlock_entry(qstate->config_entry,
7706a99fe3SHajimu UMEMOTO CELT_MULTIPART);
7806a99fe3SHajimu UMEMOTO }
7906a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_destroy);
8006a99fe3SHajimu UMEMOTO }
8106a99fe3SHajimu UMEMOTO
8206a99fe3SHajimu UMEMOTO /*
8306a99fe3SHajimu UMEMOTO * The functions below are used to process multipart write session initiation
8406a99fe3SHajimu UMEMOTO * requests.
8506a99fe3SHajimu UMEMOTO * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2
8606a99fe3SHajimu UMEMOTO * read the request itself
8706a99fe3SHajimu UMEMOTO * - on_mp_write_session_request_process processes it
8806a99fe3SHajimu UMEMOTO * - on_mp_write_session_response_write1 sends the response
8906a99fe3SHajimu UMEMOTO */
9006a99fe3SHajimu UMEMOTO int
on_mp_write_session_request_read1(struct query_state * qstate)9106a99fe3SHajimu UMEMOTO on_mp_write_session_request_read1(struct query_state *qstate)
9206a99fe3SHajimu UMEMOTO {
9306a99fe3SHajimu UMEMOTO struct cache_mp_write_session_request *c_mp_ws_request;
9406a99fe3SHajimu UMEMOTO ssize_t result;
9506a99fe3SHajimu UMEMOTO
9606a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_request_read1);
9706a99fe3SHajimu UMEMOTO if (qstate->kevent_watermark == 0)
9806a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(size_t);
9906a99fe3SHajimu UMEMOTO else {
10006a99fe3SHajimu UMEMOTO init_comm_element(&qstate->request,
10106a99fe3SHajimu UMEMOTO CET_MP_WRITE_SESSION_REQUEST);
10206a99fe3SHajimu UMEMOTO c_mp_ws_request = get_cache_mp_write_session_request(
10306a99fe3SHajimu UMEMOTO &qstate->request);
10406a99fe3SHajimu UMEMOTO
10506a99fe3SHajimu UMEMOTO result = qstate->read_func(qstate,
10606a99fe3SHajimu UMEMOTO &c_mp_ws_request->entry_length, sizeof(size_t));
10706a99fe3SHajimu UMEMOTO
10806a99fe3SHajimu UMEMOTO if (result != sizeof(size_t)) {
10906a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_request_read1",
11006a99fe3SHajimu UMEMOTO "read failed");
11106a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_request_read1);
11206a99fe3SHajimu UMEMOTO return (-1);
11306a99fe3SHajimu UMEMOTO }
11406a99fe3SHajimu UMEMOTO
11506a99fe3SHajimu UMEMOTO if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) {
11606a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_request_read1",
11706a99fe3SHajimu UMEMOTO "invalid entry_length value");
11806a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_request_read1);
11906a99fe3SHajimu UMEMOTO return (-1);
12006a99fe3SHajimu UMEMOTO }
12106a99fe3SHajimu UMEMOTO
122*8eeaaffaSDag-Erling Smørgrav c_mp_ws_request->entry = calloc(1,
12306a99fe3SHajimu UMEMOTO c_mp_ws_request->entry_length + 1);
12406a99fe3SHajimu UMEMOTO assert(c_mp_ws_request->entry != NULL);
12506a99fe3SHajimu UMEMOTO
12606a99fe3SHajimu UMEMOTO qstate->kevent_watermark = c_mp_ws_request->entry_length;
12706a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_request_read2;
12806a99fe3SHajimu UMEMOTO }
12906a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_request_read1);
13006a99fe3SHajimu UMEMOTO return (0);
13106a99fe3SHajimu UMEMOTO }
13206a99fe3SHajimu UMEMOTO
13306a99fe3SHajimu UMEMOTO static int
on_mp_write_session_request_read2(struct query_state * qstate)13406a99fe3SHajimu UMEMOTO on_mp_write_session_request_read2(struct query_state *qstate)
13506a99fe3SHajimu UMEMOTO {
13606a99fe3SHajimu UMEMOTO struct cache_mp_write_session_request *c_mp_ws_request;
13706a99fe3SHajimu UMEMOTO ssize_t result;
13806a99fe3SHajimu UMEMOTO
13906a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_request_read2);
14006a99fe3SHajimu UMEMOTO c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
14106a99fe3SHajimu UMEMOTO
14206a99fe3SHajimu UMEMOTO result = qstate->read_func(qstate, c_mp_ws_request->entry,
14306a99fe3SHajimu UMEMOTO c_mp_ws_request->entry_length);
14406a99fe3SHajimu UMEMOTO
14551d6ddb5SDag-Erling Smørgrav if (result < 0 || (size_t)result != qstate->kevent_watermark) {
14606a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_request_read2",
14706a99fe3SHajimu UMEMOTO "read failed");
14806a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_request_read2);
14906a99fe3SHajimu UMEMOTO return (-1);
15006a99fe3SHajimu UMEMOTO }
15106a99fe3SHajimu UMEMOTO
15206a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
15306a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_request_process;
15406a99fe3SHajimu UMEMOTO
15506a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_request_read2);
15606a99fe3SHajimu UMEMOTO return (0);
15706a99fe3SHajimu UMEMOTO }
15806a99fe3SHajimu UMEMOTO
15906a99fe3SHajimu UMEMOTO static int
on_mp_write_session_request_process(struct query_state * qstate)16006a99fe3SHajimu UMEMOTO on_mp_write_session_request_process(struct query_state *qstate)
16106a99fe3SHajimu UMEMOTO {
16206a99fe3SHajimu UMEMOTO struct cache_mp_write_session_request *c_mp_ws_request;
16306a99fe3SHajimu UMEMOTO struct cache_mp_write_session_response *c_mp_ws_response;
16406a99fe3SHajimu UMEMOTO cache_mp_write_session ws;
16506a99fe3SHajimu UMEMOTO cache_entry c_entry;
16606a99fe3SHajimu UMEMOTO char *dec_cache_entry_name;
16706a99fe3SHajimu UMEMOTO
16806a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_request_process);
16906a99fe3SHajimu UMEMOTO init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE);
17006a99fe3SHajimu UMEMOTO c_mp_ws_response = get_cache_mp_write_session_response(
17106a99fe3SHajimu UMEMOTO &qstate->response);
17206a99fe3SHajimu UMEMOTO c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
17306a99fe3SHajimu UMEMOTO
17406a99fe3SHajimu UMEMOTO qstate->config_entry = configuration_find_entry(
17506a99fe3SHajimu UMEMOTO s_configuration, c_mp_ws_request->entry);
17606a99fe3SHajimu UMEMOTO if (qstate->config_entry == NULL) {
17706a99fe3SHajimu UMEMOTO c_mp_ws_response->error_code = ENOENT;
17806a99fe3SHajimu UMEMOTO
17906a99fe3SHajimu UMEMOTO LOG_ERR_2("write_session_request",
18006a99fe3SHajimu UMEMOTO "can't find configuration entry '%s'. "
18106a99fe3SHajimu UMEMOTO "aborting request", c_mp_ws_request->entry);
18206a99fe3SHajimu UMEMOTO goto fin;
18306a99fe3SHajimu UMEMOTO }
18406a99fe3SHajimu UMEMOTO
18506a99fe3SHajimu UMEMOTO if (qstate->config_entry->enabled == 0) {
18606a99fe3SHajimu UMEMOTO c_mp_ws_response->error_code = EACCES;
18706a99fe3SHajimu UMEMOTO
18806a99fe3SHajimu UMEMOTO LOG_ERR_2("write_session_request",
18906a99fe3SHajimu UMEMOTO "configuration entry '%s' is disabled",
19006a99fe3SHajimu UMEMOTO c_mp_ws_request->entry);
19106a99fe3SHajimu UMEMOTO goto fin;
19206a99fe3SHajimu UMEMOTO }
19306a99fe3SHajimu UMEMOTO
19406a99fe3SHajimu UMEMOTO if (qstate->config_entry->perform_actual_lookups != 0) {
19506a99fe3SHajimu UMEMOTO c_mp_ws_response->error_code = EOPNOTSUPP;
19606a99fe3SHajimu UMEMOTO
19706a99fe3SHajimu UMEMOTO LOG_ERR_2("write_session_request",
19806a99fe3SHajimu UMEMOTO "entry '%s' performs lookups by itself: "
19906a99fe3SHajimu UMEMOTO "can't write to it", c_mp_ws_request->entry);
20006a99fe3SHajimu UMEMOTO goto fin;
20106a99fe3SHajimu UMEMOTO } else {
202db1bdf2bSMichael Bushkov #ifdef NS_NSCD_EID_CHECKING
20306a99fe3SHajimu UMEMOTO if (check_query_eids(qstate) != 0) {
20406a99fe3SHajimu UMEMOTO c_mp_ws_response->error_code = EPERM;
20506a99fe3SHajimu UMEMOTO goto fin;
20606a99fe3SHajimu UMEMOTO }
20706a99fe3SHajimu UMEMOTO #endif
20806a99fe3SHajimu UMEMOTO }
20906a99fe3SHajimu UMEMOTO
21006a99fe3SHajimu UMEMOTO /*
21106a99fe3SHajimu UMEMOTO * All multipart entries are separated by their name decorations.
21206a99fe3SHajimu UMEMOTO * For one configuration entry there will be a lot of multipart
21306a99fe3SHajimu UMEMOTO * cache entries - each with its own decorated name.
21406a99fe3SHajimu UMEMOTO */
21506a99fe3SHajimu UMEMOTO asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
21627f2bc9eSDag-Erling Smørgrav qstate->config_entry->mp_cache_params.cep.entry_name);
21706a99fe3SHajimu UMEMOTO assert(dec_cache_entry_name != NULL);
21806a99fe3SHajimu UMEMOTO
21906a99fe3SHajimu UMEMOTO configuration_lock_rdlock(s_configuration);
22006a99fe3SHajimu UMEMOTO c_entry = find_cache_entry(s_cache,
22106a99fe3SHajimu UMEMOTO dec_cache_entry_name);
22206a99fe3SHajimu UMEMOTO configuration_unlock(s_configuration);
22306a99fe3SHajimu UMEMOTO
22406a99fe3SHajimu UMEMOTO if (c_entry == INVALID_CACHE_ENTRY)
22506a99fe3SHajimu UMEMOTO c_entry = register_new_mp_cache_entry(qstate,
22606a99fe3SHajimu UMEMOTO dec_cache_entry_name);
22706a99fe3SHajimu UMEMOTO
22806a99fe3SHajimu UMEMOTO free(dec_cache_entry_name);
22906a99fe3SHajimu UMEMOTO
23006a99fe3SHajimu UMEMOTO assert(c_entry != NULL);
23106a99fe3SHajimu UMEMOTO configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
23206a99fe3SHajimu UMEMOTO ws = open_cache_mp_write_session(c_entry);
23306a99fe3SHajimu UMEMOTO if (ws == INVALID_CACHE_MP_WRITE_SESSION)
23406a99fe3SHajimu UMEMOTO c_mp_ws_response->error_code = -1;
23506a99fe3SHajimu UMEMOTO else {
23606a99fe3SHajimu UMEMOTO qstate->mdata = ws;
23706a99fe3SHajimu UMEMOTO qstate->destroy_func = on_mp_write_session_destroy;
23806a99fe3SHajimu UMEMOTO
23906a99fe3SHajimu UMEMOTO if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
24006a99fe3SHajimu UMEMOTO (qstate->config_entry->mp_query_timeout.tv_usec != 0))
24106a99fe3SHajimu UMEMOTO memcpy(&qstate->timeout,
24206a99fe3SHajimu UMEMOTO &qstate->config_entry->mp_query_timeout,
24306a99fe3SHajimu UMEMOTO sizeof(struct timeval));
24406a99fe3SHajimu UMEMOTO }
24506a99fe3SHajimu UMEMOTO configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
24606a99fe3SHajimu UMEMOTO
24706a99fe3SHajimu UMEMOTO fin:
24806a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_response_write1;
24906a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(int);
25006a99fe3SHajimu UMEMOTO qstate->kevent_filter = EVFILT_WRITE;
25106a99fe3SHajimu UMEMOTO
25206a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_request_process);
25306a99fe3SHajimu UMEMOTO return (0);
25406a99fe3SHajimu UMEMOTO }
25506a99fe3SHajimu UMEMOTO
25606a99fe3SHajimu UMEMOTO static int
on_mp_write_session_response_write1(struct query_state * qstate)25706a99fe3SHajimu UMEMOTO on_mp_write_session_response_write1(struct query_state *qstate)
25806a99fe3SHajimu UMEMOTO {
25906a99fe3SHajimu UMEMOTO struct cache_mp_write_session_response *c_mp_ws_response;
26006a99fe3SHajimu UMEMOTO ssize_t result;
26106a99fe3SHajimu UMEMOTO
26206a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_response_write1);
26306a99fe3SHajimu UMEMOTO c_mp_ws_response = get_cache_mp_write_session_response(
26406a99fe3SHajimu UMEMOTO &qstate->response);
26506a99fe3SHajimu UMEMOTO result = qstate->write_func(qstate, &c_mp_ws_response->error_code,
26606a99fe3SHajimu UMEMOTO sizeof(int));
26706a99fe3SHajimu UMEMOTO if (result != sizeof(int)) {
26806a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_response_write1",
26906a99fe3SHajimu UMEMOTO "write failed");
27006a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_response_write1);
27106a99fe3SHajimu UMEMOTO return (-1);
27206a99fe3SHajimu UMEMOTO }
27306a99fe3SHajimu UMEMOTO
27406a99fe3SHajimu UMEMOTO if (c_mp_ws_response->error_code == 0) {
27506a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(int);
27606a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_mapper;
27706a99fe3SHajimu UMEMOTO qstate->kevent_filter = EVFILT_READ;
27806a99fe3SHajimu UMEMOTO } else {
27906a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
28006a99fe3SHajimu UMEMOTO qstate->process_func = NULL;
28106a99fe3SHajimu UMEMOTO }
28206a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_response_write1);
28306a99fe3SHajimu UMEMOTO return (0);
28406a99fe3SHajimu UMEMOTO }
28506a99fe3SHajimu UMEMOTO
28606a99fe3SHajimu UMEMOTO /*
28706a99fe3SHajimu UMEMOTO * Mapper function is used to avoid multiple connections for each session
28806a99fe3SHajimu UMEMOTO * write or read requests. After processing the request, it does not close
28906a99fe3SHajimu UMEMOTO * the connection, but waits for the next request.
29006a99fe3SHajimu UMEMOTO */
29106a99fe3SHajimu UMEMOTO static int
on_mp_write_session_mapper(struct query_state * qstate)29206a99fe3SHajimu UMEMOTO on_mp_write_session_mapper(struct query_state *qstate)
29306a99fe3SHajimu UMEMOTO {
29406a99fe3SHajimu UMEMOTO ssize_t result;
29506a99fe3SHajimu UMEMOTO int elem_type;
29606a99fe3SHajimu UMEMOTO
29706a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_mapper);
29806a99fe3SHajimu UMEMOTO if (qstate->kevent_watermark == 0) {
29906a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(int);
30006a99fe3SHajimu UMEMOTO } else {
30106a99fe3SHajimu UMEMOTO result = qstate->read_func(qstate, &elem_type, sizeof(int));
30206a99fe3SHajimu UMEMOTO if (result != sizeof(int)) {
30306a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_mapper",
30406a99fe3SHajimu UMEMOTO "read failed");
30506a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_mapper);
30606a99fe3SHajimu UMEMOTO return (-1);
30706a99fe3SHajimu UMEMOTO }
30806a99fe3SHajimu UMEMOTO
30906a99fe3SHajimu UMEMOTO switch (elem_type) {
31006a99fe3SHajimu UMEMOTO case CET_MP_WRITE_SESSION_WRITE_REQUEST:
31106a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(size_t);
31206a99fe3SHajimu UMEMOTO qstate->process_func =
31306a99fe3SHajimu UMEMOTO on_mp_write_session_write_request_read1;
31406a99fe3SHajimu UMEMOTO break;
31506a99fe3SHajimu UMEMOTO case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION:
31606a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
31706a99fe3SHajimu UMEMOTO qstate->process_func =
31806a99fe3SHajimu UMEMOTO on_mp_write_session_abandon_notification;
31906a99fe3SHajimu UMEMOTO break;
32006a99fe3SHajimu UMEMOTO case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION:
32106a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
32206a99fe3SHajimu UMEMOTO qstate->process_func =
32306a99fe3SHajimu UMEMOTO on_mp_write_session_close_notification;
32406a99fe3SHajimu UMEMOTO break;
32506a99fe3SHajimu UMEMOTO default:
32606a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
32706a99fe3SHajimu UMEMOTO qstate->process_func = NULL;
32806a99fe3SHajimu UMEMOTO LOG_ERR_2("on_mp_write_session_mapper",
32906a99fe3SHajimu UMEMOTO "unknown element type");
33006a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_mapper);
33106a99fe3SHajimu UMEMOTO return (-1);
33206a99fe3SHajimu UMEMOTO }
33306a99fe3SHajimu UMEMOTO }
33406a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_mapper);
33506a99fe3SHajimu UMEMOTO return (0);
33606a99fe3SHajimu UMEMOTO }
33706a99fe3SHajimu UMEMOTO
33806a99fe3SHajimu UMEMOTO /*
33906a99fe3SHajimu UMEMOTO * The functions below are used to process multipart write sessions write
34006a99fe3SHajimu UMEMOTO * requests.
34106a99fe3SHajimu UMEMOTO * - on_mp_write_session_write_request_read1 and
34206a99fe3SHajimu UMEMOTO * on_mp_write_session_write_request_read2 read the request itself
34306a99fe3SHajimu UMEMOTO * - on_mp_write_session_write_request_process processes it
34406a99fe3SHajimu UMEMOTO * - on_mp_write_session_write_response_write1 sends the response
34506a99fe3SHajimu UMEMOTO */
34606a99fe3SHajimu UMEMOTO static int
on_mp_write_session_write_request_read1(struct query_state * qstate)34706a99fe3SHajimu UMEMOTO on_mp_write_session_write_request_read1(struct query_state *qstate)
34806a99fe3SHajimu UMEMOTO {
34906a99fe3SHajimu UMEMOTO struct cache_mp_write_session_write_request *write_request;
35006a99fe3SHajimu UMEMOTO ssize_t result;
35106a99fe3SHajimu UMEMOTO
35206a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_write_request_read1);
35306a99fe3SHajimu UMEMOTO init_comm_element(&qstate->request,
35406a99fe3SHajimu UMEMOTO CET_MP_WRITE_SESSION_WRITE_REQUEST);
35506a99fe3SHajimu UMEMOTO write_request = get_cache_mp_write_session_write_request(
35606a99fe3SHajimu UMEMOTO &qstate->request);
35706a99fe3SHajimu UMEMOTO
35806a99fe3SHajimu UMEMOTO result = qstate->read_func(qstate, &write_request->data_size,
35906a99fe3SHajimu UMEMOTO sizeof(size_t));
36006a99fe3SHajimu UMEMOTO
36106a99fe3SHajimu UMEMOTO if (result != sizeof(size_t)) {
36206a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_write_request_read1",
36306a99fe3SHajimu UMEMOTO "read failed");
36406a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_request_read1);
36506a99fe3SHajimu UMEMOTO return (-1);
36606a99fe3SHajimu UMEMOTO }
36706a99fe3SHajimu UMEMOTO
36806a99fe3SHajimu UMEMOTO if (BUFSIZE_INVALID(write_request->data_size)) {
36906a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_write_request_read1",
37006a99fe3SHajimu UMEMOTO "invalid data_size value");
37106a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_request_read1);
37206a99fe3SHajimu UMEMOTO return (-1);
37306a99fe3SHajimu UMEMOTO }
37406a99fe3SHajimu UMEMOTO
375*8eeaaffaSDag-Erling Smørgrav write_request->data = calloc(1, write_request->data_size);
37606a99fe3SHajimu UMEMOTO assert(write_request->data != NULL);
37706a99fe3SHajimu UMEMOTO
37806a99fe3SHajimu UMEMOTO qstate->kevent_watermark = write_request->data_size;
37906a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_write_request_read2;
38006a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_request_read1);
38106a99fe3SHajimu UMEMOTO return (0);
38206a99fe3SHajimu UMEMOTO }
38306a99fe3SHajimu UMEMOTO
38406a99fe3SHajimu UMEMOTO static int
on_mp_write_session_write_request_read2(struct query_state * qstate)38506a99fe3SHajimu UMEMOTO on_mp_write_session_write_request_read2(struct query_state *qstate)
38606a99fe3SHajimu UMEMOTO {
38706a99fe3SHajimu UMEMOTO struct cache_mp_write_session_write_request *write_request;
38806a99fe3SHajimu UMEMOTO ssize_t result;
38906a99fe3SHajimu UMEMOTO
39006a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_write_request_read2);
39106a99fe3SHajimu UMEMOTO write_request = get_cache_mp_write_session_write_request(
39206a99fe3SHajimu UMEMOTO &qstate->request);
39306a99fe3SHajimu UMEMOTO
39406a99fe3SHajimu UMEMOTO result = qstate->read_func(qstate, write_request->data,
39506a99fe3SHajimu UMEMOTO write_request->data_size);
39606a99fe3SHajimu UMEMOTO
39751d6ddb5SDag-Erling Smørgrav if (result < 0 || (size_t)result != qstate->kevent_watermark) {
39806a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_write_request_read2",
39906a99fe3SHajimu UMEMOTO "read failed");
40006a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_request_read2);
40106a99fe3SHajimu UMEMOTO return (-1);
40206a99fe3SHajimu UMEMOTO }
40306a99fe3SHajimu UMEMOTO
40406a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
40506a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_write_request_process;
40606a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_request_read2);
40706a99fe3SHajimu UMEMOTO return (0);
40806a99fe3SHajimu UMEMOTO }
40906a99fe3SHajimu UMEMOTO
41006a99fe3SHajimu UMEMOTO static int
on_mp_write_session_write_request_process(struct query_state * qstate)41106a99fe3SHajimu UMEMOTO on_mp_write_session_write_request_process(struct query_state *qstate)
41206a99fe3SHajimu UMEMOTO {
41306a99fe3SHajimu UMEMOTO struct cache_mp_write_session_write_request *write_request;
41406a99fe3SHajimu UMEMOTO struct cache_mp_write_session_write_response *write_response;
41506a99fe3SHajimu UMEMOTO
41606a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_write_request_process);
41706a99fe3SHajimu UMEMOTO init_comm_element(&qstate->response,
41806a99fe3SHajimu UMEMOTO CET_MP_WRITE_SESSION_WRITE_RESPONSE);
41906a99fe3SHajimu UMEMOTO write_response = get_cache_mp_write_session_write_response(
42006a99fe3SHajimu UMEMOTO &qstate->response);
42106a99fe3SHajimu UMEMOTO write_request = get_cache_mp_write_session_write_request(
42206a99fe3SHajimu UMEMOTO &qstate->request);
42306a99fe3SHajimu UMEMOTO
42406a99fe3SHajimu UMEMOTO configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
42506a99fe3SHajimu UMEMOTO write_response->error_code = cache_mp_write(
42606a99fe3SHajimu UMEMOTO (cache_mp_write_session)qstate->mdata,
42706a99fe3SHajimu UMEMOTO write_request->data,
42806a99fe3SHajimu UMEMOTO write_request->data_size);
42906a99fe3SHajimu UMEMOTO configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
43006a99fe3SHajimu UMEMOTO
43106a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(int);
43206a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_write_response_write1;
43306a99fe3SHajimu UMEMOTO qstate->kevent_filter = EVFILT_WRITE;
43406a99fe3SHajimu UMEMOTO
43506a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_request_process);
43606a99fe3SHajimu UMEMOTO return (0);
43706a99fe3SHajimu UMEMOTO }
43806a99fe3SHajimu UMEMOTO
43906a99fe3SHajimu UMEMOTO static int
on_mp_write_session_write_response_write1(struct query_state * qstate)44006a99fe3SHajimu UMEMOTO on_mp_write_session_write_response_write1(struct query_state *qstate)
44106a99fe3SHajimu UMEMOTO {
44206a99fe3SHajimu UMEMOTO struct cache_mp_write_session_write_response *write_response;
44306a99fe3SHajimu UMEMOTO ssize_t result;
44406a99fe3SHajimu UMEMOTO
44506a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_write_response_write1);
44606a99fe3SHajimu UMEMOTO write_response = get_cache_mp_write_session_write_response(
44706a99fe3SHajimu UMEMOTO &qstate->response);
44806a99fe3SHajimu UMEMOTO result = qstate->write_func(qstate, &write_response->error_code,
44906a99fe3SHajimu UMEMOTO sizeof(int));
45006a99fe3SHajimu UMEMOTO if (result != sizeof(int)) {
45106a99fe3SHajimu UMEMOTO LOG_ERR_3("on_mp_write_session_write_response_write1",
45206a99fe3SHajimu UMEMOTO "write failed");
45306a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_response_write1);
45406a99fe3SHajimu UMEMOTO return (-1);
45506a99fe3SHajimu UMEMOTO }
45606a99fe3SHajimu UMEMOTO
45706a99fe3SHajimu UMEMOTO if (write_response->error_code == 0) {
45806a99fe3SHajimu UMEMOTO finalize_comm_element(&qstate->request);
45906a99fe3SHajimu UMEMOTO finalize_comm_element(&qstate->response);
46006a99fe3SHajimu UMEMOTO
46106a99fe3SHajimu UMEMOTO qstate->kevent_watermark = sizeof(int);
46206a99fe3SHajimu UMEMOTO qstate->process_func = on_mp_write_session_mapper;
46306a99fe3SHajimu UMEMOTO qstate->kevent_filter = EVFILT_READ;
46406a99fe3SHajimu UMEMOTO } else {
46506a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
46606a99fe3SHajimu UMEMOTO qstate->process_func = 0;
46706a99fe3SHajimu UMEMOTO }
46806a99fe3SHajimu UMEMOTO
46906a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_write_response_write1);
47006a99fe3SHajimu UMEMOTO return (0);
47106a99fe3SHajimu UMEMOTO }
47206a99fe3SHajimu UMEMOTO
47306a99fe3SHajimu UMEMOTO /*
47406a99fe3SHajimu UMEMOTO * Handles abandon notifications. Destroys the session by calling the
47506a99fe3SHajimu UMEMOTO * abandon_cache_mp_write_session.
47606a99fe3SHajimu UMEMOTO */
47706a99fe3SHajimu UMEMOTO static int
on_mp_write_session_abandon_notification(struct query_state * qstate)47806a99fe3SHajimu UMEMOTO on_mp_write_session_abandon_notification(struct query_state *qstate)
47906a99fe3SHajimu UMEMOTO {
48006a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_abandon_notification);
48106a99fe3SHajimu UMEMOTO configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
48206a99fe3SHajimu UMEMOTO abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
48306a99fe3SHajimu UMEMOTO configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
48406a99fe3SHajimu UMEMOTO qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
48506a99fe3SHajimu UMEMOTO
48606a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
48706a99fe3SHajimu UMEMOTO qstate->process_func = NULL;
48806a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_abandon_notification);
48906a99fe3SHajimu UMEMOTO return (0);
49006a99fe3SHajimu UMEMOTO }
49106a99fe3SHajimu UMEMOTO
49206a99fe3SHajimu UMEMOTO /*
49306a99fe3SHajimu UMEMOTO * Handles close notifications. Commits the session by calling
49406a99fe3SHajimu UMEMOTO * the close_cache_mp_write_session.
49506a99fe3SHajimu UMEMOTO */
49606a99fe3SHajimu UMEMOTO static int
on_mp_write_session_close_notification(struct query_state * qstate)49706a99fe3SHajimu UMEMOTO on_mp_write_session_close_notification(struct query_state *qstate)
49806a99fe3SHajimu UMEMOTO {
49906a99fe3SHajimu UMEMOTO TRACE_IN(on_mp_write_session_close_notification);
50006a99fe3SHajimu UMEMOTO configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
50106a99fe3SHajimu UMEMOTO close_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
50206a99fe3SHajimu UMEMOTO configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
50306a99fe3SHajimu UMEMOTO qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
50406a99fe3SHajimu UMEMOTO
50506a99fe3SHajimu UMEMOTO qstate->kevent_watermark = 0;
50606a99fe3SHajimu UMEMOTO qstate->process_func = NULL;
50706a99fe3SHajimu UMEMOTO TRACE_OUT(on_mp_write_session_close_notification);
50806a99fe3SHajimu UMEMOTO return (0);
50906a99fe3SHajimu UMEMOTO }
51006a99fe3SHajimu UMEMOTO
register_new_mp_cache_entry(struct query_state * qstate,const char * dec_cache_entry_name)51106a99fe3SHajimu UMEMOTO cache_entry register_new_mp_cache_entry(struct query_state *qstate,
51206a99fe3SHajimu UMEMOTO const char *dec_cache_entry_name)
51306a99fe3SHajimu UMEMOTO {
51406a99fe3SHajimu UMEMOTO cache_entry c_entry;
51506a99fe3SHajimu UMEMOTO char *en_bkp;
51606a99fe3SHajimu UMEMOTO
51706a99fe3SHajimu UMEMOTO TRACE_IN(register_new_mp_cache_entry);
51806a99fe3SHajimu UMEMOTO c_entry = INVALID_CACHE_ENTRY;
51906a99fe3SHajimu UMEMOTO configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
52006a99fe3SHajimu UMEMOTO
52106a99fe3SHajimu UMEMOTO configuration_lock_wrlock(s_configuration);
52227f2bc9eSDag-Erling Smørgrav en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name;
52327f2bc9eSDag-Erling Smørgrav qstate->config_entry->mp_cache_params.cep.entry_name =
52406a99fe3SHajimu UMEMOTO (char *)dec_cache_entry_name;
52506a99fe3SHajimu UMEMOTO register_cache_entry(s_cache, (struct cache_entry_params *)
52606a99fe3SHajimu UMEMOTO &qstate->config_entry->mp_cache_params);
52727f2bc9eSDag-Erling Smørgrav qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp;
52806a99fe3SHajimu UMEMOTO configuration_unlock(s_configuration);
52906a99fe3SHajimu UMEMOTO
53006a99fe3SHajimu UMEMOTO configuration_lock_rdlock(s_configuration);
53106a99fe3SHajimu UMEMOTO c_entry = find_cache_entry(s_cache,
53206a99fe3SHajimu UMEMOTO dec_cache_entry_name);
53306a99fe3SHajimu UMEMOTO configuration_unlock(s_configuration);
53406a99fe3SHajimu UMEMOTO
53506a99fe3SHajimu UMEMOTO configuration_entry_add_mp_cache_entry(qstate->config_entry,
53606a99fe3SHajimu UMEMOTO c_entry);
53706a99fe3SHajimu UMEMOTO
53806a99fe3SHajimu UMEMOTO configuration_unlock_entry(qstate->config_entry,
53906a99fe3SHajimu UMEMOTO CELT_MULTIPART);
54006a99fe3SHajimu UMEMOTO
54106a99fe3SHajimu UMEMOTO TRACE_OUT(register_new_mp_cache_entry);
54206a99fe3SHajimu UMEMOTO return (c_entry);
54306a99fe3SHajimu UMEMOTO }
544