xref: /netbsd-src/lib/libisns/isns_thread.c (revision 946379e7b37692fc43f68eb0d1c10daa0a7f3b6c)
1 /*	$NetBSD: isns_thread.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $	*/
2 
3 /*-
4  * Copyright (c) 2004,2009 The NetBSD Foundation, Inc.
5  * All rights reserved.
6  *
7  * This code is derived from software contributed to The NetBSD Foundation
8  * by Wasabi Systems, Inc.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include <sys/cdefs.h>
33 __RCSID("$NetBSD: isns_thread.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $");
34 
35 
36 /*
37  * isns_thread.c
38  */
39 
40 #include <sys/types.h>
41 
42 #include <unistd.h>
43 
44 #include "isns.h"
45 #include "isns_config.h"
46 #include "isns_defs.h"
47 
48 static struct iovec read_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) +
49     ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)];
50 
51 static struct isns_task_s *isns_get_next_task(struct isns_config_s *);
52 
53 /*
54  * isns_control_thread()
55  */
56 void *
57 isns_control_thread(void *arg)
58 {
59 	struct isns_config_s *cfg_p = (struct isns_config_s *)arg;
60 	struct kevent evt_chgs[5], *evt_p;
61 
62 	int n, nevents;
63 	isns_kevent_handler *evt_handler_p;
64 	int run_thread;
65 
66 	run_thread = 1;
67 
68 	while (run_thread) {
69 		/* if no task outstanding, check queue here and send PDU */
70 		while ((cfg_p->curtask_p == NULL)
71 		    && ((cfg_p->curtask_p = isns_get_next_task(cfg_p)) != NULL))
72 			isns_run_task(cfg_p->curtask_p);
73 
74 		nevents = kevent(cfg_p->kq, NULL, 0, evt_chgs,
75 		    ARRAY_ELEMS(evt_chgs), NULL);
76 
77 		DBG("isns_control_thread: kevent() nevents=%d\n", nevents);
78 
79 		for (n = 0, evt_p = evt_chgs; n < nevents; n++, evt_p++) {
80 			DBG("event[%d] - data=%d\n", n, (int)evt_p->data);
81 			evt_handler_p = (void *)evt_p->udata;
82 			run_thread = (evt_handler_p(evt_p, cfg_p) == 0);
83 		}
84 	}
85 
86 	return 0;
87 }
88 
89 /*
90  * isns_get_next_task()
91  */
92 static struct isns_task_s *
93 isns_get_next_task(struct isns_config_s *cfg_p)
94 {
95 	struct isns_task_s *task_p = NULL;
96 
97 
98 	DBG("isns_get_next_task: entered\n");
99 
100 	task_p = isns_taskq_remove(cfg_p);
101 
102 	if (cfg_p->sd_connected)
103 		return task_p;
104 	else {
105 		if (task_p == NULL)
106 			return NULL;
107 		else {
108 			if (task_p->task_type != ISNS_TASK_INIT_SOCKET_IO) {
109 				isns_taskq_insert_head(cfg_p, task_p);
110 
111 				task_p = isns_new_task(cfg_p,
112 				    ISNS_TASK_RECONNECT_SERVER, 0);
113 				task_p->var.reconnect_server.ai_p = cfg_p->ai_p;
114 			}
115 
116 			return task_p;
117 		}
118 	}
119 }
120 
121 /*
122  * isns_kevent_pipe()
123  */
124 int
125 isns_kevent_pipe(struct kevent* evt_p, struct isns_config_s *cfg_p)
126 {
127 	uint8_t cmd_type;
128 	int force_isns_stop;
129 	uint16_t trans_id;
130 	ssize_t rbytes;
131 	int pipe_nbytes;
132 
133 	force_isns_stop = 0;
134 	pipe_nbytes = (int)evt_p->data;
135 
136 	while (pipe_nbytes > 0) {
137 		rbytes = read(cfg_p->pipe_fds[0], &cmd_type,
138 		    sizeof(cmd_type));
139 		if (rbytes < 0) {
140 			DBG("isns_kevent_pipe: error on wepe_sys_read\n");
141 			/*?? should we break here? */
142 			continue;
143 		}
144 
145 		pipe_nbytes -= (int)rbytes;
146 		switch (cmd_type) {
147 		case ISNS_CMD_PROCESS_TASKQ:
148 			DBG("isns_kevent_pipe: ISNS_CMD_PROCESS_TASKQ\n");
149 			break;
150 
151 		case ISNS_CMD_ABORT_TRANS:
152 			DBG("isns_kevent_pipe: ISNS_CMD_ABORT_TRANS\n");
153 			rbytes = read(cfg_p->pipe_fds[0], &trans_id,
154 			    sizeof(trans_id));
155 			if ((rbytes < 0) && (rbytes == sizeof(trans_id)))
156 				isns_abort_trans(cfg_p, trans_id);
157 			else
158 				DBG("isns_kevent_pipe: "
159 				    "error reading trans id\n");
160 			pipe_nbytes -= (int)rbytes;
161 			break;
162 
163 		case ISNS_CMD_STOP:
164 			DBG("isns_kevent_pipe: ISNS_CMD_STOP\n");
165 			force_isns_stop = 1;
166 			pipe_nbytes = 0;
167 			break;
168 
169 		default:
170 			DBG("isns_kevent_pipe: unknown command (cmd=%d)\n",
171 			    cmd_type);
172 			break;
173 		}
174 	}
175 
176 	return (force_isns_stop ? 1 : 0);
177 }
178 
179 /*
180  * isns_is_trans_complete()
181  */
182 static int
183 isns_is_trans_complete(struct isns_trans_s *trans_p)
184 {
185 	struct isns_pdu_s *pdu_p;
186 	uint16_t count;
187 
188 	pdu_p = trans_p->pdu_rsp_list;
189 	count = 0;
190 	while (pdu_p->next != NULL) {
191 		if (pdu_p->hdr.seq_id != count++) return 0;
192 		pdu_p = pdu_p->next;
193 	}
194 	if ((pdu_p->hdr.seq_id != count) ||
195 	    !(pdu_p->hdr.flags & ISNS_FLAG_LAST_PDU))
196 		return 0;
197 
198 	return 1;
199 }
200 
201 /*
202  * isns_is_valid_resp()
203  */
204 static int
205 isns_is_valid_resp(struct isns_trans_s *trans_p, struct isns_pdu_s *pdu_p)
206 {
207 	struct isns_pdu_s *curpdu_p;
208 
209 	if (pdu_p->hdr.trans_id != trans_p->id)
210 		return 0;
211 	if (pdu_p->hdr.func_id != (trans_p->func_id | 0x8000))
212 		return 0;
213 	curpdu_p = trans_p->pdu_rsp_list;
214 	while (curpdu_p != NULL) {
215 		if (curpdu_p->hdr.seq_id == pdu_p->hdr.seq_id) return 0;
216 		curpdu_p = curpdu_p->next;
217 	}
218 
219 	return 1;
220 }
221 
222 /*
223  * isns_process_in_pdu()
224  */
225 static void
226 isns_process_in_pdu(struct isns_config_s *cfg_p)
227 {
228 	struct isns_task_s *curtask_p;
229 	struct isns_trans_s *trans_p;
230 
231 	DBG("isns_process_in_pdu: entered\n");
232 
233 	if ((curtask_p = cfg_p->curtask_p) == NULL)
234 		isns_free_pdu(cfg_p->pdu_in_p);
235 	else if ((trans_p = curtask_p->var.send_pdu.trans_p) == NULL)
236 		isns_free_pdu(cfg_p->pdu_in_p);
237 	else if (!isns_is_valid_resp(trans_p, cfg_p->pdu_in_p))
238 		isns_free_pdu(cfg_p->pdu_in_p);
239 	else {
240 		isns_add_pdu_response(trans_p, cfg_p->pdu_in_p);
241 
242 		if (isns_is_trans_complete(trans_p)) {
243 			isns_complete_trans(trans_p);
244 			isns_end_task(curtask_p);
245 		}
246 	}
247 
248 	cfg_p->pdu_in_p = NULL;
249 }
250 
251 /*
252  * isns_kevent_socket()
253  */
254 int
255 isns_kevent_socket(struct kevent *evt_p, struct isns_config_s *cfg_p)
256 {
257 	struct iovec *iovp;
258 	struct isns_buffer_s *curbuf_p, *newbuf_p;
259 	struct isns_pdu_s *pdu_p;
260 	int64_t bavail; /* bytes available in socket buffer */
261 	uint32_t cur_len, buf_len, unread_len, rd_len, b_len;
262 	ssize_t rv;
263 	uint16_t payload_len;
264 	int iovcnt, more, transport_evt;
265 
266 
267 	DBG("isns_kevent_socket: entered\n");
268 
269 	transport_evt = 0;
270 	bavail = evt_p->data;
271 	iovp = read_buf;
272 
273 	more = (bavail > 0);
274 	while (more) {
275 		if (cfg_p->pdu_in_p == NULL) {
276 			/*
277  	 		 * Try to form a valid pdu by starting with the hdr.
278 			 * If there isn't enough data in the socket buffer
279 			 * to form a full hdr, just return.
280  	 		 *
281  	 		 * Once we have read in our hdr, allocate all buffers
282 			 * needed.
283  	 		 */
284 
285 			if (bavail < (int64_t)sizeof(struct isns_pdu_hdr_s))
286 				return 0;
287 
288 			/* Form a placeholder pdu */
289 			pdu_p = isns_new_pdu(cfg_p, 0, 0, 0);
290 
291 			/* Read the header into our placeholder pdu */
292 			read_buf[0].iov_base = &(pdu_p->hdr);
293 			read_buf[0].iov_len = sizeof(struct isns_pdu_hdr_s);
294 			iovcnt = 1;
295 
296 			iovp = read_buf;
297 			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
298 			if ((rv == 0) || (rv == -1)) {
299 				DBG("isns_kevent_socket: isns_socket_readv(1) "
300 				    "returned %d\n", rv);
301 				transport_evt = 1;
302 				break;
303 			}
304 
305 			bavail -= sizeof(struct isns_pdu_hdr_s);
306 			/*
307 			 * ToDo: read until sizeof(struct isns_pdu_hdr_s) has
308 			 *       been read in. This statement should be
309 			 *
310 			 *       bavail -= rv;
311 			 */
312 
313 			/* adjust byte order */
314 			pdu_p->hdr.isnsp_version = isns_ntohs(pdu_p->hdr.
315 			    isnsp_version);
316 			pdu_p->hdr.func_id = isns_ntohs(pdu_p->hdr.func_id);
317 			pdu_p->hdr.payload_len = isns_ntohs(pdu_p->hdr.
318 			    payload_len);
319 			pdu_p->hdr.flags = isns_ntohs(pdu_p->hdr.flags);
320 			pdu_p->hdr.trans_id = isns_ntohs(pdu_p->hdr.trans_id);
321 			pdu_p->hdr.seq_id = isns_ntohs(pdu_p->hdr.seq_id);
322 			pdu_p->byteorder_host = 1;
323 
324 			/* Try to sense early whether we might have garbage */
325 			if (pdu_p->hdr.isnsp_version != ISNSP_VERSION) {
326 				DBG("isns_kevent_socket: pdu_p->hdr."
327 				    "isnsp_version != ISNSP_VERSION\n");
328 				isns_free_pdu(pdu_p);
329 
330 				transport_evt = 1;
331 				break;
332 			}
333 
334 			/* Allocate all the necessary payload buffers */
335 			payload_len = pdu_p->hdr.payload_len;
336 			curbuf_p = pdu_p->payload_p;
337 			buf_len = 0;
338 			while (buf_len + curbuf_p->alloc_len < payload_len) {
339 				buf_len += curbuf_p->alloc_len;
340 				newbuf_p = isns_new_buffer(0);
341 				curbuf_p->next = newbuf_p;
342 				curbuf_p = newbuf_p;
343 			}
344 			curbuf_p->next = NULL;
345 
346 			/* Hold on to our placeholder pdu */
347 			cfg_p->pdu_in_p = pdu_p;
348 			more = (bavail > 0) ? 1 : 0;
349 		} else if (bavail > 0) {
350 			/*
351  	 		 * Fill in the pdu payload data.
352 			 *
353  	 		 * If we can fill it all in now
354 	 		 *     -AND- it corresponds to the active transaction
355 			 *           then add the pdu to the transaction's
356 			 *           pdu_rsp_list
357 	 		 *     -AND- it does not correspond to the active
358 			 *           transaction (or there is no active
359 			 *           transaction) then drop it on the floor.
360 			 * We may not be able to fill it all in now.
361 	 		 *     -EITHER WAY- fill in as much payload data now
362 			 *                  as we can.
363  	 		 */
364 
365 			/* Refer to our placeholder pdu */
366 			pdu_p = cfg_p->pdu_in_p;
367 
368 			/* How much payload data has been filled in? */
369 			cur_len = 0;
370 			curbuf_p = pdu_p->payload_p;
371 			while (curbuf_p->cur_len == curbuf_p->alloc_len) {
372 				cur_len += curbuf_p->cur_len;
373 				curbuf_p = curbuf_p->next;
374 			}
375 			cur_len += curbuf_p->cur_len;
376 
377 			/* How much payload data is left to be filled in? */
378 			unread_len = pdu_p->hdr.payload_len - cur_len;
379 
380 			/* Read as much remaining payload data as possible */
381 			iovcnt = 0;
382 			while (curbuf_p->next != NULL) {
383 				read_buf[iovcnt].iov_base = isns_buffer_data(
384 			    	    curbuf_p, curbuf_p->cur_len);
385 				read_buf[iovcnt].iov_len = curbuf_p->alloc_len -
386 			    	    curbuf_p->cur_len;
387 				iovcnt++;
388 
389 				curbuf_p = curbuf_p->next;
390 			}
391 			read_buf[iovcnt].iov_base = isns_buffer_data(curbuf_p,
392 		    	    curbuf_p->cur_len);
393 			read_buf[iovcnt].iov_len = unread_len;
394 			iovcnt++;
395 
396 			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
397 			if ((rv == 0) || (rv == -1)) {
398 				DBG("isns_kevent_socket: isns_socket_readv(2) "
399 			    	    "returned %d\n",rv);
400 				isns_free_pdu(cfg_p->pdu_in_p);
401 				cfg_p->pdu_in_p = NULL;
402 
403 				transport_evt = 1;
404 				break;
405 			}
406 
407 			/* Update cur_len in buffers that newly have data */
408 			curbuf_p = pdu_p->payload_p;
409 			while (curbuf_p->cur_len == curbuf_p->alloc_len)
410 				curbuf_p = curbuf_p->next;
411 
412 			rd_len = (uint32_t)rv;
413 			do {
414 				b_len = curbuf_p->alloc_len - curbuf_p->cur_len;
415 				if (rd_len > b_len) {
416 					curbuf_p->cur_len = curbuf_p->alloc_len;
417 					rd_len -= b_len;
418 				} else {
419 					curbuf_p->cur_len += rd_len;
420 					break;
421 				}
422 
423 				curbuf_p = curbuf_p->next;
424 			} while (curbuf_p != NULL);
425 
426 			bavail -= rv;
427 
428 			if (rv == (int)unread_len)
429 				isns_process_in_pdu(cfg_p);
430 
431 			more = (bavail > (int64_t)sizeof(struct isns_pdu_hdr_s)) ? 1 : 0;
432 		}
433 	}
434 
435 	transport_evt |= (evt_p->flags & EV_EOF);
436 	if (transport_evt) {
437 		DBG("isns_kevent_socket: processing transport event\n");
438 
439 		isns_socket_close(cfg_p->sd);
440 		cfg_p->sd_connected = 0;
441 
442 		if (cfg_p->curtask_p != NULL)
443 			isns_process_connection_loss(cfg_p);
444 
445 		if (cfg_p->pdu_in_p != NULL) {
446 			isns_free_pdu(cfg_p->pdu_in_p);
447 			cfg_p->pdu_in_p = NULL;
448 		}
449 	}
450 
451 	return 0;
452 }
453 
454 /* ARGSUSED */
455 /*
456  * isns_kevent_timer_recon()
457  */
458 int
459 isns_kevent_timer_recon(struct kevent *evt_p, struct isns_config_s *cfg_p)
460 {
461 	int rv;
462 
463 
464 	DBG("isns_kevent_timer_recon: entered\n");
465 
466 	rv = isns_socket_create(&(cfg_p->sd), cfg_p->ai_p->ai_family,
467 		cfg_p->ai_p->ai_socktype);
468 	if (rv != 0)
469 		return 0;
470 
471 	rv = isns_socket_connect(cfg_p->sd, cfg_p->ai_p->ai_addr,
472 	    cfg_p->ai_p->ai_addrlen);
473 	if (rv == 0) {
474 		/* Remove ISNS_EVT_TIMER_RECON from kqueue */
475 		rv = isns_change_kevent_list(cfg_p,
476 		    (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_DELETE,
477 		    (int64_t)0, (intptr_t)0);
478 		if (rv == -1)
479 			DBG("isns_kevent_timer_recon: error on "
480 			    "isns_change_kevent_list(1)\n");
481 
482 		cfg_p->sd_connected = 1;
483 
484 		/* Add cfg_p->sd to kqueue */
485 		rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd,
486 		    EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0,
487 		    (intptr_t)isns_kevent_socket);
488 		if (rv == -1)
489 			DBG("isns_kevent_timer_recon: error on "
490 			    "isns_change_kevent_list(2)\n");
491 
492 		isns_end_task(cfg_p->curtask_p);
493 	}
494 
495 	return 0;
496 }
497 
498 
499 /* ARGSUSED */
500 /*
501  * isns_kevent_timer_refresh
502  */
503 int
504 isns_kevent_timer_refresh(struct kevent* evt_p, struct isns_config_s *cfg_p)
505 {
506 	struct isns_refresh_s *ref_p;
507 	ISNS_TRANS trans;
508 	uint32_t status;
509 	int rval;
510 
511 	DBG("isns_kevent_timer_refresh: entered\n");
512 
513 	/* If refresh info pointer NULL, or no name assigned, just return. */
514 	ref_p = cfg_p->refresh_p;
515 	if ((ref_p == NULL) || (ref_p->node[0] == '\0'))
516 	    	return 0;
517 
518 	if (ref_p->trans_p != NULL) {
519 		/* If the previous refresh trans is not complete, return. */
520 		rval = isns_get_pdu_response_status(ref_p->trans_p, &status);
521 		if (rval == EPERM) {
522 			DBG("isns_kevent_timer_refresh: "
523 			    "prev refresh trans not complete\n");
524 			return 0;
525 		}
526 		/* Free previous refresh trans. */
527 		isns_free_trans(ref_p->trans_p);
528 		ref_p->trans_p = NULL;
529 	}
530 
531 	/* Build new refresh transaction and send it. */
532 	trans = isns_new_trans((ISNS_HANDLE)cfg_p, isnsp_DevAttrQry, 0);
533 	if (trans == ISNS_INVALID_TRANS) {
534 		DBG("isns_kevent_timer_refresh: error on isns_new_trans()\n");
535 		return 0;
536 	}
537 
538 	ref_p->trans_p = (struct isns_trans_s *)trans;
539 	/* First we add our source attribute */
540 	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
541 	/* Now add our message attribute */
542 	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
543 	isns_add_tlv(trans, isnst_Delimiter, 0, NULL);
544 	/* and finally the operating attributes */
545 	isns_add_tlv(trans, isnst_EID, 0, NULL);
546 	isns_send_trans(trans, NULL, NULL);
547 
548 	return 0;
549 }
550