xref: /freebsd-src/contrib/sendmail/libmilter/worker.c (revision e2c0e292e8a7ca00ba99bcfccc9e637f45c3e8b1)
1d0cef73dSGregory Neil Shapiro /*
25dd76dd0SGregory Neil Shapiro  *  Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers.
3d0cef73dSGregory Neil Shapiro  *	All rights reserved.
4d0cef73dSGregory Neil Shapiro  *
5d0cef73dSGregory Neil Shapiro  * By using this file, you agree to the terms and conditions set
6d0cef73dSGregory Neil Shapiro  * forth in the LICENSE file which can be found at the top level of
7d0cef73dSGregory Neil Shapiro  * the sendmail distribution.
8d0cef73dSGregory Neil Shapiro  *
9d0cef73dSGregory Neil Shapiro  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10d0cef73dSGregory Neil Shapiro  *   Jose-Marcio.Martins@ensmp.fr
11d0cef73dSGregory Neil Shapiro  */
12d0cef73dSGregory Neil Shapiro 
13d0cef73dSGregory Neil Shapiro #include <sm/gen.h>
144313cc83SGregory Neil Shapiro SM_RCSID("@(#)$Id: worker.c,v 8.25 2013-11-22 20:51:37 ca Exp $")
15d0cef73dSGregory Neil Shapiro 
16d0cef73dSGregory Neil Shapiro #include "libmilter.h"
17d0cef73dSGregory Neil Shapiro 
18d0cef73dSGregory Neil Shapiro #if _FFR_WORKERS_POOL
19d0cef73dSGregory Neil Shapiro 
20d0cef73dSGregory Neil Shapiro typedef struct taskmgr_S taskmgr_T;
21d0cef73dSGregory Neil Shapiro 
22d0cef73dSGregory Neil Shapiro #define TM_SIGNATURE		0x23021957
23d0cef73dSGregory Neil Shapiro 
24d0cef73dSGregory Neil Shapiro struct taskmgr_S
25d0cef73dSGregory Neil Shapiro {
26d0cef73dSGregory Neil Shapiro 	long		tm_signature; /* has the controller been initialized */
27d0cef73dSGregory Neil Shapiro 	sthread_t	tm_tid;	/* thread id of controller */
28d0cef73dSGregory Neil Shapiro 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
29d0cef73dSGregory Neil Shapiro 
30d0cef73dSGregory Neil Shapiro 	int		tm_nb_workers;	/* number of workers in the pool */
31d0cef73dSGregory Neil Shapiro 	int		tm_nb_idle;	/* number of workers waiting */
32d0cef73dSGregory Neil Shapiro 
33d0cef73dSGregory Neil Shapiro 	int		tm_p[2];	/* poll control pipe */
34d0cef73dSGregory Neil Shapiro 
35d0cef73dSGregory Neil Shapiro 	smutex_t	tm_w_mutex;	/* linked list access mutex */
36d0cef73dSGregory Neil Shapiro 	scond_t		tm_w_cond;	/* */
37d0cef73dSGregory Neil Shapiro };
38d0cef73dSGregory Neil Shapiro 
39d0cef73dSGregory Neil Shapiro static taskmgr_T     Tskmgr = {0};
40d0cef73dSGregory Neil Shapiro 
41d0cef73dSGregory Neil Shapiro #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
42d0cef73dSGregory Neil Shapiro 
43d0cef73dSGregory Neil Shapiro #define RD_PIPE	(Tskmgr.tm_p[0])
44d0cef73dSGregory Neil Shapiro #define WR_PIPE	(Tskmgr.tm_p[1])
45d0cef73dSGregory Neil Shapiro 
46d0cef73dSGregory Neil Shapiro #define PIPE_SEND_SIGNAL()						\
47d0cef73dSGregory Neil Shapiro 	do								\
48d0cef73dSGregory Neil Shapiro 	{								\
49d0cef73dSGregory Neil Shapiro 		char evt = 0x5a;					\
50d0cef73dSGregory Neil Shapiro 		int fd = WR_PIPE;					\
51d0cef73dSGregory Neil Shapiro 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
52d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR,				\
53d0cef73dSGregory Neil Shapiro 				"Error writing to event pipe: %s",	\
54d0cef73dSGregory Neil Shapiro 				sm_errstring(errno));			\
55d0cef73dSGregory Neil Shapiro 	} while (0)
56d0cef73dSGregory Neil Shapiro 
57d0cef73dSGregory Neil Shapiro #ifndef USE_PIPE_WAKE_POLL
58d0cef73dSGregory Neil Shapiro # define USE_PIPE_WAKE_POLL 1
59*5b0945b5SGregory Neil Shapiro #endif
60d0cef73dSGregory Neil Shapiro 
61d0cef73dSGregory Neil Shapiro /* poll check periodicity (default 10000 - 10 s) */
62d0cef73dSGregory Neil Shapiro #define POLL_TIMEOUT   10000
63d0cef73dSGregory Neil Shapiro 
64d0cef73dSGregory Neil Shapiro /* worker conditional wait timeout (default 10 s) */
65d0cef73dSGregory Neil Shapiro #define COND_TIMEOUT     10
66d0cef73dSGregory Neil Shapiro 
67d0cef73dSGregory Neil Shapiro /* functions */
68d0cef73dSGregory Neil Shapiro static int mi_close_session __P((SMFICTX_PTR));
69d0cef73dSGregory Neil Shapiro 
70d0cef73dSGregory Neil Shapiro static void *mi_worker __P((void *));
71d0cef73dSGregory Neil Shapiro static void *mi_pool_controller __P((void *));
72d0cef73dSGregory Neil Shapiro 
73d0cef73dSGregory Neil Shapiro static int mi_list_add_ctx __P((SMFICTX_PTR));
74d0cef73dSGregory Neil Shapiro static int mi_list_del_ctx __P((SMFICTX_PTR));
75d0cef73dSGregory Neil Shapiro 
76d0cef73dSGregory Neil Shapiro /*
77d0cef73dSGregory Neil Shapiro **  periodicity of cleaning up old sessions (timedout)
78d0cef73dSGregory Neil Shapiro **	sessions list will be checked to find old inactive
79d0cef73dSGregory Neil Shapiro **	sessions each DT_CHECK_OLD_SESSIONS sec
80d0cef73dSGregory Neil Shapiro */
81d0cef73dSGregory Neil Shapiro 
82d0cef73dSGregory Neil Shapiro #define DT_CHECK_OLD_SESSIONS   600
83d0cef73dSGregory Neil Shapiro 
84d0cef73dSGregory Neil Shapiro #ifndef OLD_SESSION_TIMEOUT
85d0cef73dSGregory Neil Shapiro # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86*5b0945b5SGregory Neil Shapiro #endif
87d0cef73dSGregory Neil Shapiro 
88d0cef73dSGregory Neil Shapiro /* session states - with respect to the pool of workers */
89d0cef73dSGregory Neil Shapiro #define WKST_INIT		0	/* initial state */
90d0cef73dSGregory Neil Shapiro #define WKST_READY_TO_RUN	1	/* command ready do be read */
91d0cef73dSGregory Neil Shapiro #define WKST_RUNNING		2	/* session running on a worker */
92d0cef73dSGregory Neil Shapiro #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
93d0cef73dSGregory Neil Shapiro #define WKST_WAITING		4	/* waiting for new command */
94d0cef73dSGregory Neil Shapiro #define WKST_CLOSING		5	/* session finished */
95d0cef73dSGregory Neil Shapiro 
96d0cef73dSGregory Neil Shapiro #ifndef MIN_WORKERS
97d0cef73dSGregory Neil Shapiro # define MIN_WORKERS	2  /* minimum number of threads to keep around */
98d0cef73dSGregory Neil Shapiro #endif
99d0cef73dSGregory Neil Shapiro 
100d0cef73dSGregory Neil Shapiro #define MIN_IDLE	1  /* minimum number of idle threads */
101d0cef73dSGregory Neil Shapiro 
102d0cef73dSGregory Neil Shapiro 
103d0cef73dSGregory Neil Shapiro /*
104d0cef73dSGregory Neil Shapiro **  Macros for threads and mutex management
105d0cef73dSGregory Neil Shapiro */
106d0cef73dSGregory Neil Shapiro 
107d0cef73dSGregory Neil Shapiro #define TASKMGR_LOCK()							\
108d0cef73dSGregory Neil Shapiro 	do								\
109d0cef73dSGregory Neil Shapiro 	{								\
110d0cef73dSGregory Neil Shapiro 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
111d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
112d0cef73dSGregory Neil Shapiro 	} while (0)
113d0cef73dSGregory Neil Shapiro 
114d0cef73dSGregory Neil Shapiro #define TASKMGR_UNLOCK()						\
115d0cef73dSGregory Neil Shapiro 	do								\
116d0cef73dSGregory Neil Shapiro 	{								\
117d0cef73dSGregory Neil Shapiro 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
118d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
119d0cef73dSGregory Neil Shapiro 	} while (0)
120d0cef73dSGregory Neil Shapiro 
121d0cef73dSGregory Neil Shapiro #define	TASKMGR_COND_WAIT()						\
122d0cef73dSGregory Neil Shapiro 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123d0cef73dSGregory Neil Shapiro 
124d0cef73dSGregory Neil Shapiro #define	TASKMGR_COND_SIGNAL()						\
125d0cef73dSGregory Neil Shapiro 	do								\
126d0cef73dSGregory Neil Shapiro 	{								\
127d0cef73dSGregory Neil Shapiro 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
128d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129d0cef73dSGregory Neil Shapiro 	} while (0)
130d0cef73dSGregory Neil Shapiro 
131d0cef73dSGregory Neil Shapiro #define LAUNCH_WORKER(ctx)						\
132d0cef73dSGregory Neil Shapiro 	do								\
133d0cef73dSGregory Neil Shapiro 	{								\
134d0cef73dSGregory Neil Shapiro 		int r;							\
135d0cef73dSGregory Neil Shapiro 		sthread_t tid;						\
136d0cef73dSGregory Neil Shapiro 									\
137d0cef73dSGregory Neil Shapiro 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
138d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139d0cef73dSGregory Neil Shapiro 				sm_errstring(r));			\
140d0cef73dSGregory Neil Shapiro 	} while (0)
141d0cef73dSGregory Neil Shapiro 
142d0cef73dSGregory Neil Shapiro #if POOL_DEBUG
143d0cef73dSGregory Neil Shapiro # define POOL_LEV_DPRINTF(lev, x)					\
144ba00ec3dSGregory Neil Shapiro 	do								\
145ba00ec3dSGregory Neil Shapiro 	{								\
146*5b0945b5SGregory Neil Shapiro 		if (ctx != NULL && (lev) < ctx->ctx_dbg)		\
147d0cef73dSGregory Neil Shapiro 			sm_dprintf x;					\
148d0cef73dSGregory Neil Shapiro 	} while (0)
149d0cef73dSGregory Neil Shapiro #else /* POOL_DEBUG */
150d0cef73dSGregory Neil Shapiro # define POOL_LEV_DPRINTF(lev, x)
151d0cef73dSGregory Neil Shapiro #endif /* POOL_DEBUG */
152d0cef73dSGregory Neil Shapiro 
153d0cef73dSGregory Neil Shapiro /*
154d0cef73dSGregory Neil Shapiro **  MI_START_SESSION -- Start a session in the pool of workers
155d0cef73dSGregory Neil Shapiro **
156d0cef73dSGregory Neil Shapiro **	Parameters:
157d0cef73dSGregory Neil Shapiro **		ctx -- context structure
158d0cef73dSGregory Neil Shapiro **
159d0cef73dSGregory Neil Shapiro **	Returns:
160d0cef73dSGregory Neil Shapiro **		MI_SUCCESS/MI_FAILURE
161d0cef73dSGregory Neil Shapiro */
162d0cef73dSGregory Neil Shapiro 
163d0cef73dSGregory Neil Shapiro int
mi_start_session(ctx)164d0cef73dSGregory Neil Shapiro mi_start_session(ctx)
165d0cef73dSGregory Neil Shapiro 	SMFICTX_PTR ctx;
166d0cef73dSGregory Neil Shapiro {
167d0cef73dSGregory Neil Shapiro 	static long id = 0;
168d0cef73dSGregory Neil Shapiro 
1696f9c8e5bSGregory Neil Shapiro 	/* this can happen if the milter is shutting down */
1706f9c8e5bSGregory Neil Shapiro 	if (Tskmgr.tm_signature != TM_SIGNATURE)
1716f9c8e5bSGregory Neil Shapiro 		return MI_FAILURE;
172d0cef73dSGregory Neil Shapiro 	SM_ASSERT(ctx != NULL);
173d0cef73dSGregory Neil Shapiro 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
174d0cef73dSGregory Neil Shapiro 	TASKMGR_LOCK();
175d0cef73dSGregory Neil Shapiro 
176d0cef73dSGregory Neil Shapiro 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
177d0cef73dSGregory Neil Shapiro 	{
178d0cef73dSGregory Neil Shapiro 		TASKMGR_UNLOCK();
179d0cef73dSGregory Neil Shapiro 		return MI_FAILURE;
180d0cef73dSGregory Neil Shapiro 	}
181d0cef73dSGregory Neil Shapiro 
182d0cef73dSGregory Neil Shapiro 	ctx->ctx_sid = id++;
183d0cef73dSGregory Neil Shapiro 
184d0cef73dSGregory Neil Shapiro 	/* if there is an idle worker, signal it, otherwise start new worker */
185d0cef73dSGregory Neil Shapiro 	if (Tskmgr.tm_nb_idle > 0)
186d0cef73dSGregory Neil Shapiro 	{
187d0cef73dSGregory Neil Shapiro 		ctx->ctx_wstate = WKST_READY_TO_RUN;
188d0cef73dSGregory Neil Shapiro 		TASKMGR_COND_SIGNAL();
189d0cef73dSGregory Neil Shapiro 	}
190d0cef73dSGregory Neil Shapiro 	else
191d0cef73dSGregory Neil Shapiro 	{
192d0cef73dSGregory Neil Shapiro 		ctx->ctx_wstate = WKST_RUNNING;
193d0cef73dSGregory Neil Shapiro 		LAUNCH_WORKER(ctx);
194d0cef73dSGregory Neil Shapiro 	}
195d0cef73dSGregory Neil Shapiro 	TASKMGR_UNLOCK();
196d0cef73dSGregory Neil Shapiro 	return MI_SUCCESS;
197d0cef73dSGregory Neil Shapiro }
198d0cef73dSGregory Neil Shapiro 
199d0cef73dSGregory Neil Shapiro /*
200d0cef73dSGregory Neil Shapiro **  MI_CLOSE_SESSION -- Close a session and clean up data structures
201d0cef73dSGregory Neil Shapiro **
202d0cef73dSGregory Neil Shapiro **	Parameters:
203d0cef73dSGregory Neil Shapiro **		ctx -- context structure
204d0cef73dSGregory Neil Shapiro **
205d0cef73dSGregory Neil Shapiro **	Returns:
206d0cef73dSGregory Neil Shapiro **		MI_SUCCESS/MI_FAILURE
207d0cef73dSGregory Neil Shapiro */
208d0cef73dSGregory Neil Shapiro 
209d0cef73dSGregory Neil Shapiro static int
mi_close_session(ctx)210d0cef73dSGregory Neil Shapiro mi_close_session(ctx)
211d0cef73dSGregory Neil Shapiro 	SMFICTX_PTR ctx;
212d0cef73dSGregory Neil Shapiro {
213d0cef73dSGregory Neil Shapiro 	SM_ASSERT(ctx != NULL);
214d0cef73dSGregory Neil Shapiro 
215d0cef73dSGregory Neil Shapiro 	(void) mi_list_del_ctx(ctx);
2169bd497b8SGregory Neil Shapiro 	mi_clr_ctx(ctx);
217d0cef73dSGregory Neil Shapiro 
218d0cef73dSGregory Neil Shapiro 	return MI_SUCCESS;
219d0cef73dSGregory Neil Shapiro }
220d0cef73dSGregory Neil Shapiro 
221d0cef73dSGregory Neil Shapiro /*
2226f9c8e5bSGregory Neil Shapiro **  NONBLOCKING -- set nonblocking mode for a file descriptor.
2236f9c8e5bSGregory Neil Shapiro **
2246f9c8e5bSGregory Neil Shapiro **	Parameters:
2256f9c8e5bSGregory Neil Shapiro **		fd -- file descriptor
2266f9c8e5bSGregory Neil Shapiro **		name -- name for (error) logging
2276f9c8e5bSGregory Neil Shapiro **
2286f9c8e5bSGregory Neil Shapiro **	Returns:
2296f9c8e5bSGregory Neil Shapiro **		MI_SUCCESS/MI_FAILURE
2306f9c8e5bSGregory Neil Shapiro */
2316f9c8e5bSGregory Neil Shapiro 
2326f9c8e5bSGregory Neil Shapiro static int
nonblocking(int fd,const char * name)2336f9c8e5bSGregory Neil Shapiro nonblocking(int fd, const char *name)
2346f9c8e5bSGregory Neil Shapiro {
2356f9c8e5bSGregory Neil Shapiro 	int r;
2366f9c8e5bSGregory Neil Shapiro 
2376f9c8e5bSGregory Neil Shapiro 	errno = 0;
2386f9c8e5bSGregory Neil Shapiro 	r = fcntl(fd, F_GETFL, 0);
2396f9c8e5bSGregory Neil Shapiro 	if (r == -1)
2406f9c8e5bSGregory Neil Shapiro 	{
2416f9c8e5bSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
2426f9c8e5bSGregory Neil Shapiro 			name, sm_errstring(errno));
2436f9c8e5bSGregory Neil Shapiro 		return MI_FAILURE;
2446f9c8e5bSGregory Neil Shapiro 	}
2456f9c8e5bSGregory Neil Shapiro 	errno = 0;
2466f9c8e5bSGregory Neil Shapiro 	r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
2476f9c8e5bSGregory Neil Shapiro 	if (r == -1)
2486f9c8e5bSGregory Neil Shapiro 	{
2496f9c8e5bSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
2506f9c8e5bSGregory Neil Shapiro 			name, sm_errstring(errno));
2516f9c8e5bSGregory Neil Shapiro 		return MI_FAILURE;
2526f9c8e5bSGregory Neil Shapiro 	}
2536f9c8e5bSGregory Neil Shapiro 	return MI_SUCCESS;
2546f9c8e5bSGregory Neil Shapiro }
2556f9c8e5bSGregory Neil Shapiro 
2566f9c8e5bSGregory Neil Shapiro /*
257da7d7b9cSGregory Neil Shapiro **  MI_POOL_CONTROLLER_INIT -- Launch the worker pool controller
258d0cef73dSGregory Neil Shapiro **		Must be called before starting sessions.
259d0cef73dSGregory Neil Shapiro **
260d0cef73dSGregory Neil Shapiro **	Parameters:
261d0cef73dSGregory Neil Shapiro **		none
262d0cef73dSGregory Neil Shapiro **
263d0cef73dSGregory Neil Shapiro **	Returns:
264d0cef73dSGregory Neil Shapiro **		MI_SUCCESS/MI_FAILURE
265d0cef73dSGregory Neil Shapiro */
266d0cef73dSGregory Neil Shapiro 
267d0cef73dSGregory Neil Shapiro int
mi_pool_controller_init()268d0cef73dSGregory Neil Shapiro mi_pool_controller_init()
269d0cef73dSGregory Neil Shapiro {
270d0cef73dSGregory Neil Shapiro 	sthread_t tid;
271d0cef73dSGregory Neil Shapiro 	int r, i;
272d0cef73dSGregory Neil Shapiro 
273d0cef73dSGregory Neil Shapiro 	if (Tskmgr.tm_signature == TM_SIGNATURE)
274d0cef73dSGregory Neil Shapiro 		return MI_SUCCESS;
275d0cef73dSGregory Neil Shapiro 
276d0cef73dSGregory Neil Shapiro 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
277d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_tid = (sthread_t) -1;
278d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_nb_workers = 0;
279d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_nb_idle = 0;
280d0cef73dSGregory Neil Shapiro 
281d0cef73dSGregory Neil Shapiro 	if (pipe(Tskmgr.tm_p) != 0)
282d0cef73dSGregory Neil Shapiro 	{
283d0cef73dSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
2849bd497b8SGregory Neil Shapiro 			sm_errstring(errno));
285d0cef73dSGregory Neil Shapiro 		return MI_FAILURE;
286d0cef73dSGregory Neil Shapiro 	}
2876f9c8e5bSGregory Neil Shapiro 	r = nonblocking(WR_PIPE, "WR_PIPE");
2886f9c8e5bSGregory Neil Shapiro 	if (r != MI_SUCCESS)
2896f9c8e5bSGregory Neil Shapiro 		return r;
2906f9c8e5bSGregory Neil Shapiro 	r = nonblocking(RD_PIPE, "RD_PIPE");
2916f9c8e5bSGregory Neil Shapiro 	if (r != MI_SUCCESS)
2926f9c8e5bSGregory Neil Shapiro 		return r;
293d0cef73dSGregory Neil Shapiro 
294d0cef73dSGregory Neil Shapiro 	(void) smutex_init(&Tskmgr.tm_w_mutex);
295d0cef73dSGregory Neil Shapiro 	(void) scond_init(&Tskmgr.tm_w_cond);
296d0cef73dSGregory Neil Shapiro 
297d0cef73dSGregory Neil Shapiro 	/* Launch the pool controller */
298d0cef73dSGregory Neil Shapiro 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
299d0cef73dSGregory Neil Shapiro 	{
300d0cef73dSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
301d0cef73dSGregory Neil Shapiro 			sm_errstring(r));
302d0cef73dSGregory Neil Shapiro 		return MI_FAILURE;
303d0cef73dSGregory Neil Shapiro 	}
304d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_tid = tid;
305d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_signature = TM_SIGNATURE;
306d0cef73dSGregory Neil Shapiro 
307d0cef73dSGregory Neil Shapiro 	/* Create the pool of workers */
308d0cef73dSGregory Neil Shapiro 	for (i = 0; i < MIN_WORKERS; i++)
309d0cef73dSGregory Neil Shapiro 	{
310d0cef73dSGregory Neil Shapiro 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
311d0cef73dSGregory Neil Shapiro 		{
312d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
313d0cef73dSGregory Neil Shapiro 				sm_errstring(r));
314d0cef73dSGregory Neil Shapiro 			return MI_FAILURE;
315d0cef73dSGregory Neil Shapiro 		}
316d0cef73dSGregory Neil Shapiro 	}
317d0cef73dSGregory Neil Shapiro 
318d0cef73dSGregory Neil Shapiro 	return MI_SUCCESS;
319d0cef73dSGregory Neil Shapiro }
320d0cef73dSGregory Neil Shapiro 
321d0cef73dSGregory Neil Shapiro /*
322d0cef73dSGregory Neil Shapiro **  MI_POOL_CONTROLLER -- manage the pool of workers
323d0cef73dSGregory Neil Shapiro **	This thread must be running when listener begins
324d0cef73dSGregory Neil Shapiro **	starting sessions
325d0cef73dSGregory Neil Shapiro **
326d0cef73dSGregory Neil Shapiro **	Parameters:
327d0cef73dSGregory Neil Shapiro **		arg -- unused
328d0cef73dSGregory Neil Shapiro **
329d0cef73dSGregory Neil Shapiro **	Returns:
330d0cef73dSGregory Neil Shapiro **		NULL
331d0cef73dSGregory Neil Shapiro **
332d0cef73dSGregory Neil Shapiro **	Control flow:
333d0cef73dSGregory Neil Shapiro **		for (;;)
334d0cef73dSGregory Neil Shapiro **			Look for timed out sessions
335d0cef73dSGregory Neil Shapiro **			Select sessions to wait for sendmail command
336d0cef73dSGregory Neil Shapiro **			Poll set of file descriptors
337d0cef73dSGregory Neil Shapiro **			if timeout
338d0cef73dSGregory Neil Shapiro **				continue
339d0cef73dSGregory Neil Shapiro **			For each file descriptor ready
340d0cef73dSGregory Neil Shapiro **				launch new thread if no worker available
341d0cef73dSGregory Neil Shapiro **				else
342d0cef73dSGregory Neil Shapiro **				signal waiting worker
343d0cef73dSGregory Neil Shapiro */
344d0cef73dSGregory Neil Shapiro 
345d0cef73dSGregory Neil Shapiro /* Poll structure array (pollfd) size step */
346d0cef73dSGregory Neil Shapiro #define PFD_STEP	256
347d0cef73dSGregory Neil Shapiro 
348d0cef73dSGregory Neil Shapiro #define WAIT_FD(i)	(pfd[i].fd)
349d0cef73dSGregory Neil Shapiro #define WAITFN		"POLL"
350d0cef73dSGregory Neil Shapiro 
351d0cef73dSGregory Neil Shapiro static void *
mi_pool_controller(arg)352d0cef73dSGregory Neil Shapiro mi_pool_controller(arg)
353d0cef73dSGregory Neil Shapiro 	void *arg;
354d0cef73dSGregory Neil Shapiro {
355d0cef73dSGregory Neil Shapiro 	struct pollfd *pfd = NULL;
356d0cef73dSGregory Neil Shapiro 	int dim_pfd = 0;
357d0cef73dSGregory Neil Shapiro 	bool rebuild_set = true;
358d0cef73dSGregory Neil Shapiro 	int pcnt = 0; /* error count for poll() failures */
3599bd497b8SGregory Neil Shapiro 	time_t lastcheck;
360d0cef73dSGregory Neil Shapiro 
361d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_tid = sthread_get_id();
362d0cef73dSGregory Neil Shapiro 	if (pthread_detach(Tskmgr.tm_tid) != 0)
363d0cef73dSGregory Neil Shapiro 	{
364d0cef73dSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
365d0cef73dSGregory Neil Shapiro 		return NULL;
366d0cef73dSGregory Neil Shapiro 	}
367d0cef73dSGregory Neil Shapiro 
368d0cef73dSGregory Neil Shapiro 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
369d0cef73dSGregory Neil Shapiro 	if (pfd == NULL)
370d0cef73dSGregory Neil Shapiro 	{
371d0cef73dSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
372d0cef73dSGregory Neil Shapiro 			sm_errstring(errno));
373d0cef73dSGregory Neil Shapiro 		return NULL;
374d0cef73dSGregory Neil Shapiro 	}
375d0cef73dSGregory Neil Shapiro 	dim_pfd = PFD_STEP;
376d0cef73dSGregory Neil Shapiro 
3779bd497b8SGregory Neil Shapiro 	lastcheck = time(NULL);
378d0cef73dSGregory Neil Shapiro 	for (;;)
379d0cef73dSGregory Neil Shapiro 	{
380d0cef73dSGregory Neil Shapiro 		SMFICTX_PTR ctx;
381ba00ec3dSGregory Neil Shapiro 		int nfd, r, i;
382d0cef73dSGregory Neil Shapiro 		time_t now;
383d0cef73dSGregory Neil Shapiro 
384d0cef73dSGregory Neil Shapiro 		if (mi_stop() != MILTER_CONT)
385d0cef73dSGregory Neil Shapiro 			break;
386d0cef73dSGregory Neil Shapiro 
387d0cef73dSGregory Neil Shapiro 		TASKMGR_LOCK();
388d0cef73dSGregory Neil Shapiro 
389d0cef73dSGregory Neil Shapiro 		now = time(NULL);
390d0cef73dSGregory Neil Shapiro 
391d0cef73dSGregory Neil Shapiro 		/* check for timed out sessions? */
392d0cef73dSGregory Neil Shapiro 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
393d0cef73dSGregory Neil Shapiro 		{
3949bd497b8SGregory Neil Shapiro 			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
3959bd497b8SGregory Neil Shapiro 			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
396d0cef73dSGregory Neil Shapiro 			{
3979bd497b8SGregory Neil Shapiro 				SMFICTX_PTR ctx_nxt;
3989bd497b8SGregory Neil Shapiro 
3999bd497b8SGregory Neil Shapiro 				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
400d0cef73dSGregory Neil Shapiro 				if (ctx->ctx_wstate == WKST_WAITING)
401d0cef73dSGregory Neil Shapiro 				{
402d0cef73dSGregory Neil Shapiro 					if (ctx->ctx_wait == 0)
403d0cef73dSGregory Neil Shapiro 						ctx->ctx_wait = now;
4049bd497b8SGregory Neil Shapiro 					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
405d0cef73dSGregory Neil Shapiro 						 < now)
406d0cef73dSGregory Neil Shapiro 					{
4079bd497b8SGregory Neil Shapiro 						/* if session timed out, close it */
408d0cef73dSGregory Neil Shapiro 						sfsistat (*fi_close) __P((SMFICTX *));
409d0cef73dSGregory Neil Shapiro 
410d0cef73dSGregory Neil Shapiro 						POOL_LEV_DPRINTF(4,
411d0cef73dSGregory Neil Shapiro 							("Closing old connection: sd=%d id=%d",
412d0cef73dSGregory Neil Shapiro 							ctx->ctx_sd,
413d0cef73dSGregory Neil Shapiro 							ctx->ctx_sid));
414d0cef73dSGregory Neil Shapiro 
415d0cef73dSGregory Neil Shapiro 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
416d0cef73dSGregory Neil Shapiro 							(void) (*fi_close)(ctx);
417d0cef73dSGregory Neil Shapiro 
418d0cef73dSGregory Neil Shapiro 						mi_close_session(ctx);
419d0cef73dSGregory Neil Shapiro 					}
420d0cef73dSGregory Neil Shapiro 				}
4219bd497b8SGregory Neil Shapiro 				ctx = ctx_nxt;
422d0cef73dSGregory Neil Shapiro 			}
423d0cef73dSGregory Neil Shapiro 			lastcheck = now;
424d0cef73dSGregory Neil Shapiro 		}
425d0cef73dSGregory Neil Shapiro 
426d0cef73dSGregory Neil Shapiro 		if (rebuild_set)
427d0cef73dSGregory Neil Shapiro 		{
428d0cef73dSGregory Neil Shapiro 			/*
429d0cef73dSGregory Neil Shapiro 			**  Initialize poll set.
430d0cef73dSGregory Neil Shapiro 			**  Insert into the poll set the file descriptors of
431d0cef73dSGregory Neil Shapiro 			**  all sessions waiting for a command from sendmail.
432d0cef73dSGregory Neil Shapiro 			*/
433d0cef73dSGregory Neil Shapiro 
434d0cef73dSGregory Neil Shapiro 			nfd = 0;
435d0cef73dSGregory Neil Shapiro 
436d0cef73dSGregory Neil Shapiro 			/* begin with worker pipe */
437d0cef73dSGregory Neil Shapiro 			pfd[nfd].fd = RD_PIPE;
438d0cef73dSGregory Neil Shapiro 			pfd[nfd].events = MI_POLL_RD_FLAGS;
439d0cef73dSGregory Neil Shapiro 			pfd[nfd].revents = 0;
440d0cef73dSGregory Neil Shapiro 			nfd++;
441d0cef73dSGregory Neil Shapiro 
442d0cef73dSGregory Neil Shapiro 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
443d0cef73dSGregory Neil Shapiro 			{
444d0cef73dSGregory Neil Shapiro 				/*
445d0cef73dSGregory Neil Shapiro 				**  update ctx_wait - start of wait moment -
446d0cef73dSGregory Neil Shapiro 				**  for timeout
447d0cef73dSGregory Neil Shapiro 				*/
448d0cef73dSGregory Neil Shapiro 
449d0cef73dSGregory Neil Shapiro 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
450d0cef73dSGregory Neil Shapiro 					ctx->ctx_wait = now;
451d0cef73dSGregory Neil Shapiro 
452d0cef73dSGregory Neil Shapiro 				/* add the session to the pollfd array? */
453d0cef73dSGregory Neil Shapiro 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
454d0cef73dSGregory Neil Shapiro 				    (ctx->ctx_wstate == WKST_WAITING))
455d0cef73dSGregory Neil Shapiro 				{
456d0cef73dSGregory Neil Shapiro 					/*
457d0cef73dSGregory Neil Shapiro 					**  Resize the pollfd array if it
458d0cef73dSGregory Neil Shapiro 					**  isn't large enough.
459d0cef73dSGregory Neil Shapiro 					*/
460d0cef73dSGregory Neil Shapiro 
461d0cef73dSGregory Neil Shapiro 					if (nfd >= dim_pfd)
462d0cef73dSGregory Neil Shapiro 					{
463d0cef73dSGregory Neil Shapiro 						struct pollfd *tpfd;
464d0cef73dSGregory Neil Shapiro 						size_t new;
465d0cef73dSGregory Neil Shapiro 
466d0cef73dSGregory Neil Shapiro 						new = (dim_pfd + PFD_STEP) *
467d0cef73dSGregory Neil Shapiro 							sizeof(*tpfd);
468d0cef73dSGregory Neil Shapiro 						tpfd = (struct pollfd *)
469d0cef73dSGregory Neil Shapiro 							realloc(pfd, new);
470d0cef73dSGregory Neil Shapiro 						if (tpfd != NULL)
471d0cef73dSGregory Neil Shapiro 						{
472d0cef73dSGregory Neil Shapiro 							pfd = tpfd;
473d0cef73dSGregory Neil Shapiro 							dim_pfd += PFD_STEP;
474d0cef73dSGregory Neil Shapiro 						}
475d0cef73dSGregory Neil Shapiro 						else
476d0cef73dSGregory Neil Shapiro 						{
477d0cef73dSGregory Neil Shapiro 							smi_log(SMI_LOG_ERR,
478d0cef73dSGregory Neil Shapiro 								"Failed to realloc pollfd array:%s",
479d0cef73dSGregory Neil Shapiro 								sm_errstring(errno));
480d0cef73dSGregory Neil Shapiro 						}
481d0cef73dSGregory Neil Shapiro 					}
482d0cef73dSGregory Neil Shapiro 
483d0cef73dSGregory Neil Shapiro 					/* add the session to pollfd array */
484d0cef73dSGregory Neil Shapiro 					if (nfd < dim_pfd)
485d0cef73dSGregory Neil Shapiro 					{
486d0cef73dSGregory Neil Shapiro 						ctx->ctx_wstate = WKST_WAITING;
487d0cef73dSGregory Neil Shapiro 						pfd[nfd].fd = ctx->ctx_sd;
488d0cef73dSGregory Neil Shapiro 						pfd[nfd].events = MI_POLL_RD_FLAGS;
489d0cef73dSGregory Neil Shapiro 						pfd[nfd].revents = 0;
490d0cef73dSGregory Neil Shapiro 						nfd++;
491d0cef73dSGregory Neil Shapiro 					}
492d0cef73dSGregory Neil Shapiro 				}
493d0cef73dSGregory Neil Shapiro 			}
4949bd497b8SGregory Neil Shapiro 			rebuild_set = false;
495d0cef73dSGregory Neil Shapiro 		}
496d0cef73dSGregory Neil Shapiro 
497d0cef73dSGregory Neil Shapiro 		TASKMGR_UNLOCK();
498d0cef73dSGregory Neil Shapiro 
499d0cef73dSGregory Neil Shapiro 		/* Everything is ready, let's wait for an event */
500ba00ec3dSGregory Neil Shapiro 		r = poll(pfd, nfd, POLL_TIMEOUT);
501d0cef73dSGregory Neil Shapiro 
502d0cef73dSGregory Neil Shapiro 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
503d0cef73dSGregory Neil Shapiro 			WAITFN, now, nfd));
504d0cef73dSGregory Neil Shapiro 
505d0cef73dSGregory Neil Shapiro 		/* timeout */
506ba00ec3dSGregory Neil Shapiro 		if (r == 0)
507d0cef73dSGregory Neil Shapiro 			continue;
508d0cef73dSGregory Neil Shapiro 
509d0cef73dSGregory Neil Shapiro 		rebuild_set = true;
510d0cef73dSGregory Neil Shapiro 
511d0cef73dSGregory Neil Shapiro 		/* error */
512ba00ec3dSGregory Neil Shapiro 		if (r < 0)
513d0cef73dSGregory Neil Shapiro 		{
514d0cef73dSGregory Neil Shapiro 			if (errno == EINTR)
515d0cef73dSGregory Neil Shapiro 				continue;
516d0cef73dSGregory Neil Shapiro 			pcnt++;
517d0cef73dSGregory Neil Shapiro 			smi_log(SMI_LOG_ERR,
518d0cef73dSGregory Neil Shapiro 				"%s() failed (%s), %s",
519d0cef73dSGregory Neil Shapiro 				WAITFN, sm_errstring(errno),
520d0cef73dSGregory Neil Shapiro 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
521d0cef73dSGregory Neil Shapiro 
522d0cef73dSGregory Neil Shapiro 			if (pcnt >= MAX_FAILS_S)
523d0cef73dSGregory Neil Shapiro 				goto err;
524ba00ec3dSGregory Neil Shapiro 			continue;
525d0cef73dSGregory Neil Shapiro 		}
526d0cef73dSGregory Neil Shapiro 		pcnt = 0;
527d0cef73dSGregory Neil Shapiro 
528d0cef73dSGregory Neil Shapiro 		/* something happened */
529d0cef73dSGregory Neil Shapiro 		for (i = 0; i < nfd; i++)
530d0cef73dSGregory Neil Shapiro 		{
531d0cef73dSGregory Neil Shapiro 			if (pfd[i].revents == 0)
532d0cef73dSGregory Neil Shapiro 				continue;
533d0cef73dSGregory Neil Shapiro 
534d0cef73dSGregory Neil Shapiro 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
535d0cef73dSGregory Neil Shapiro 				WAITFN, i, nfd,
536d0cef73dSGregory Neil Shapiro 			WAIT_FD(i)));
537d0cef73dSGregory Neil Shapiro 
538d0cef73dSGregory Neil Shapiro 			/* has a worker signaled an end of task? */
539d0cef73dSGregory Neil Shapiro 			if (WAIT_FD(i) == RD_PIPE)
540d0cef73dSGregory Neil Shapiro 			{
5416f9c8e5bSGregory Neil Shapiro 				char evts[256];
5426f9c8e5bSGregory Neil Shapiro 				ssize_t r;
543d0cef73dSGregory Neil Shapiro 
544d0cef73dSGregory Neil Shapiro 				POOL_LEV_DPRINTF(4,
545d0cef73dSGregory Neil Shapiro 					("PIPE WILL READ evt = %08X %08X",
546d0cef73dSGregory Neil Shapiro 					pfd[i].events, pfd[i].revents));
547d0cef73dSGregory Neil Shapiro 
5486f9c8e5bSGregory Neil Shapiro 				r = 1;
5496f9c8e5bSGregory Neil Shapiro 				while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
5506f9c8e5bSGregory Neil Shapiro 					&& r != -1)
551d0cef73dSGregory Neil Shapiro 				{
5526f9c8e5bSGregory Neil Shapiro 					r = read(RD_PIPE, evts, sizeof(evts));
553d0cef73dSGregory Neil Shapiro 				}
554d0cef73dSGregory Neil Shapiro 
555d0cef73dSGregory Neil Shapiro 				POOL_LEV_DPRINTF(4,
556d0cef73dSGregory Neil Shapiro 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
5576f9c8e5bSGregory Neil Shapiro 					i, RD_PIPE, (int) r, evts[0]));
558d0cef73dSGregory Neil Shapiro 
559d0cef73dSGregory Neil Shapiro 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
560d0cef73dSGregory Neil Shapiro 				{
561d0cef73dSGregory Neil Shapiro 					/* Exception handling */
562d0cef73dSGregory Neil Shapiro 				}
563d0cef73dSGregory Neil Shapiro 				continue;
564d0cef73dSGregory Neil Shapiro 			}
565d0cef73dSGregory Neil Shapiro 
566ba00ec3dSGregory Neil Shapiro 			/*
567ba00ec3dSGregory Neil Shapiro 			**  Not the pipe for workers waking us,
568ba00ec3dSGregory Neil Shapiro 			**  so must be something on an MTA connection.
569ba00ec3dSGregory Neil Shapiro 			*/
570ba00ec3dSGregory Neil Shapiro 
571ba00ec3dSGregory Neil Shapiro 			TASKMGR_LOCK();
572d0cef73dSGregory Neil Shapiro 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
573d0cef73dSGregory Neil Shapiro 			{
574d0cef73dSGregory Neil Shapiro 				if (ctx->ctx_wstate != WKST_WAITING)
575d0cef73dSGregory Neil Shapiro 					continue;
576d0cef73dSGregory Neil Shapiro 
577d0cef73dSGregory Neil Shapiro 				POOL_LEV_DPRINTF(4,
578d0cef73dSGregory Neil Shapiro 					("Checking context sd=%d - fd=%d ",
579d0cef73dSGregory Neil Shapiro 					ctx->ctx_sd , WAIT_FD(i)));
580d0cef73dSGregory Neil Shapiro 
581d0cef73dSGregory Neil Shapiro 				if (ctx->ctx_sd == pfd[i].fd)
582d0cef73dSGregory Neil Shapiro 				{
583d0cef73dSGregory Neil Shapiro 
584d0cef73dSGregory Neil Shapiro 					POOL_LEV_DPRINTF(4,
585d0cef73dSGregory Neil Shapiro 						("TASK: found %d for fd[%d]=%d",
586d0cef73dSGregory Neil Shapiro 						ctx->ctx_sid, i, WAIT_FD(i)));
587d0cef73dSGregory Neil Shapiro 
588d0cef73dSGregory Neil Shapiro 					if (Tskmgr.tm_nb_idle > 0)
589d0cef73dSGregory Neil Shapiro 					{
590d0cef73dSGregory Neil Shapiro 						ctx->ctx_wstate = WKST_READY_TO_RUN;
591d0cef73dSGregory Neil Shapiro 						TASKMGR_COND_SIGNAL();
592d0cef73dSGregory Neil Shapiro 					}
593d0cef73dSGregory Neil Shapiro 					else
594d0cef73dSGregory Neil Shapiro 					{
595d0cef73dSGregory Neil Shapiro 						ctx->ctx_wstate = WKST_RUNNING;
596d0cef73dSGregory Neil Shapiro 						LAUNCH_WORKER(ctx);
597d0cef73dSGregory Neil Shapiro 					}
598d0cef73dSGregory Neil Shapiro 					break;
599d0cef73dSGregory Neil Shapiro 				}
600d0cef73dSGregory Neil Shapiro 			}
601ba00ec3dSGregory Neil Shapiro 			TASKMGR_UNLOCK();
602d0cef73dSGregory Neil Shapiro 
603d0cef73dSGregory Neil Shapiro 			POOL_LEV_DPRINTF(4,
604d0cef73dSGregory Neil Shapiro 				("TASK %s FOUND - Checking PIPE for fd[%d]",
605d0cef73dSGregory Neil Shapiro 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
606d0cef73dSGregory Neil Shapiro 		}
607d0cef73dSGregory Neil Shapiro 	}
608d0cef73dSGregory Neil Shapiro 
609d0cef73dSGregory Neil Shapiro   err:
610d0cef73dSGregory Neil Shapiro 	if (pfd != NULL)
611d0cef73dSGregory Neil Shapiro 		free(pfd);
612d0cef73dSGregory Neil Shapiro 
613d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_signature = 0;
614ba00ec3dSGregory Neil Shapiro #if 0
615ba00ec3dSGregory Neil Shapiro 	/*
616ba00ec3dSGregory Neil Shapiro 	**  Do not clean up ctx -- it can cause double-free()s.
617ba00ec3dSGregory Neil Shapiro 	**  The program is shutting down anyway, so it's not worth the trouble.
618ba00ec3dSGregory Neil Shapiro 	**  There is a more complex solution that prevents race conditions
619ba00ec3dSGregory Neil Shapiro 	**  while accessing ctx, but that's maybe for a later version.
620ba00ec3dSGregory Neil Shapiro 	*/
621ba00ec3dSGregory Neil Shapiro 
622d0cef73dSGregory Neil Shapiro 	for (;;)
623d0cef73dSGregory Neil Shapiro 	{
624d0cef73dSGregory Neil Shapiro 		SMFICTX_PTR ctx;
625d0cef73dSGregory Neil Shapiro 
626d0cef73dSGregory Neil Shapiro 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
627d0cef73dSGregory Neil Shapiro 		if (ctx == NULL)
628d0cef73dSGregory Neil Shapiro 			break;
629d0cef73dSGregory Neil Shapiro 		mi_close_session(ctx);
630d0cef73dSGregory Neil Shapiro 	}
631ba00ec3dSGregory Neil Shapiro #endif
632d0cef73dSGregory Neil Shapiro 
633d0cef73dSGregory Neil Shapiro 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
634d0cef73dSGregory Neil Shapiro 	(void) scond_destroy(&Tskmgr.tm_w_cond);
635d0cef73dSGregory Neil Shapiro 
636d0cef73dSGregory Neil Shapiro 	return NULL;
637d0cef73dSGregory Neil Shapiro }
638d0cef73dSGregory Neil Shapiro 
639d0cef73dSGregory Neil Shapiro /*
640d0cef73dSGregory Neil Shapiro **  Look for a task ready to run.
641d0cef73dSGregory Neil Shapiro **  Value of ctx is NULL or a pointer to a task ready to run.
642d0cef73dSGregory Neil Shapiro */
643d0cef73dSGregory Neil Shapiro 
644d0cef73dSGregory Neil Shapiro #define GET_TASK_READY_TO_RUN()					\
645d0cef73dSGregory Neil Shapiro 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
646d0cef73dSGregory Neil Shapiro 	{							\
647d0cef73dSGregory Neil Shapiro 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
648d0cef73dSGregory Neil Shapiro 		{						\
649d0cef73dSGregory Neil Shapiro 			ctx->ctx_wstate = WKST_RUNNING;		\
650d0cef73dSGregory Neil Shapiro 			break;					\
651d0cef73dSGregory Neil Shapiro 		}						\
652d0cef73dSGregory Neil Shapiro 	}
653d0cef73dSGregory Neil Shapiro 
654d0cef73dSGregory Neil Shapiro /*
655d0cef73dSGregory Neil Shapiro **  MI_WORKER -- worker thread
656d0cef73dSGregory Neil Shapiro **	executes tasks distributed by the mi_pool_controller
657d0cef73dSGregory Neil Shapiro **	or by mi_start_session
658d0cef73dSGregory Neil Shapiro **
659d0cef73dSGregory Neil Shapiro **	Parameters:
660d0cef73dSGregory Neil Shapiro **		arg -- pointer to context structure
661d0cef73dSGregory Neil Shapiro **
662d0cef73dSGregory Neil Shapiro **	Returns:
663d0cef73dSGregory Neil Shapiro **		NULL pointer
664d0cef73dSGregory Neil Shapiro */
665d0cef73dSGregory Neil Shapiro 
666d0cef73dSGregory Neil Shapiro static void *
mi_worker(arg)667d0cef73dSGregory Neil Shapiro mi_worker(arg)
668d0cef73dSGregory Neil Shapiro 	void *arg;
669d0cef73dSGregory Neil Shapiro {
670d0cef73dSGregory Neil Shapiro 	SMFICTX_PTR ctx;
671d0cef73dSGregory Neil Shapiro 	bool done;
672d0cef73dSGregory Neil Shapiro 	sthread_t t_id;
673d0cef73dSGregory Neil Shapiro 	int r;
674d0cef73dSGregory Neil Shapiro 
675d0cef73dSGregory Neil Shapiro 	ctx = (SMFICTX_PTR) arg;
676d0cef73dSGregory Neil Shapiro 	done = false;
677d0cef73dSGregory Neil Shapiro 	if (ctx != NULL)
678d0cef73dSGregory Neil Shapiro 		ctx->ctx_wstate = WKST_RUNNING;
679d0cef73dSGregory Neil Shapiro 
680d0cef73dSGregory Neil Shapiro 	t_id = sthread_get_id();
681d0cef73dSGregory Neil Shapiro 	if (pthread_detach(t_id) != 0)
682d0cef73dSGregory Neil Shapiro 	{
683d0cef73dSGregory Neil Shapiro 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
684d0cef73dSGregory Neil Shapiro 		if (ctx != NULL)
685d0cef73dSGregory Neil Shapiro 			ctx->ctx_wstate = WKST_READY_TO_RUN;
686d0cef73dSGregory Neil Shapiro 		return NULL;
687d0cef73dSGregory Neil Shapiro 	}
688d0cef73dSGregory Neil Shapiro 
689d0cef73dSGregory Neil Shapiro 	TASKMGR_LOCK();
690d0cef73dSGregory Neil Shapiro 	Tskmgr.tm_nb_workers++;
691d0cef73dSGregory Neil Shapiro 	TASKMGR_UNLOCK();
692d0cef73dSGregory Neil Shapiro 
693d0cef73dSGregory Neil Shapiro 	while (!done)
694d0cef73dSGregory Neil Shapiro 	{
695d0cef73dSGregory Neil Shapiro 		if (mi_stop() != MILTER_CONT)
696d0cef73dSGregory Neil Shapiro 			break;
697d0cef73dSGregory Neil Shapiro 
698d0cef73dSGregory Neil Shapiro 		/* let's handle next task... */
699d0cef73dSGregory Neil Shapiro 		if (ctx != NULL)
700d0cef73dSGregory Neil Shapiro 		{
701d0cef73dSGregory Neil Shapiro 			int res;
702d0cef73dSGregory Neil Shapiro 
703d0cef73dSGregory Neil Shapiro 			POOL_LEV_DPRINTF(4,
704d0cef73dSGregory Neil Shapiro 				("worker %d: new task -> let's handle it",
705d0cef73dSGregory Neil Shapiro 				t_id));
706d0cef73dSGregory Neil Shapiro 			res = mi_engine(ctx);
707d0cef73dSGregory Neil Shapiro 			POOL_LEV_DPRINTF(4,
708d0cef73dSGregory Neil Shapiro 				("worker %d: mi_engine returned %d", t_id, res));
709d0cef73dSGregory Neil Shapiro 
710d0cef73dSGregory Neil Shapiro 			TASKMGR_LOCK();
711d0cef73dSGregory Neil Shapiro 			if (res != MI_CONTINUE)
712d0cef73dSGregory Neil Shapiro 			{
713d0cef73dSGregory Neil Shapiro 				ctx->ctx_wstate = WKST_CLOSING;
714d0cef73dSGregory Neil Shapiro 
715d0cef73dSGregory Neil Shapiro 				/*
716d0cef73dSGregory Neil Shapiro 				**  Delete context from linked list of
717d0cef73dSGregory Neil Shapiro 				**  sessions and close session.
718d0cef73dSGregory Neil Shapiro 				*/
719d0cef73dSGregory Neil Shapiro 
720d0cef73dSGregory Neil Shapiro 				mi_close_session(ctx);
721d0cef73dSGregory Neil Shapiro 			}
722d0cef73dSGregory Neil Shapiro 			else
723d0cef73dSGregory Neil Shapiro 			{
724d0cef73dSGregory Neil Shapiro 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
725d0cef73dSGregory Neil Shapiro 
726d0cef73dSGregory Neil Shapiro 				POOL_LEV_DPRINTF(4,
727d0cef73dSGregory Neil Shapiro 					("writing to event pipe..."));
728d0cef73dSGregory Neil Shapiro 
729d0cef73dSGregory Neil Shapiro 				/*
730d0cef73dSGregory Neil Shapiro 				**  Signal task controller to add new session
731d0cef73dSGregory Neil Shapiro 				**  to poll set.
732d0cef73dSGregory Neil Shapiro 				*/
733d0cef73dSGregory Neil Shapiro 
734d0cef73dSGregory Neil Shapiro 				PIPE_SEND_SIGNAL();
735d0cef73dSGregory Neil Shapiro 			}
736d0cef73dSGregory Neil Shapiro 			TASKMGR_UNLOCK();
737d0cef73dSGregory Neil Shapiro 			ctx = NULL;
738d0cef73dSGregory Neil Shapiro 
739d0cef73dSGregory Neil Shapiro 		}
740d0cef73dSGregory Neil Shapiro 
741d0cef73dSGregory Neil Shapiro 		/* check if there is any task waiting to be served */
742d0cef73dSGregory Neil Shapiro 		TASKMGR_LOCK();
743d0cef73dSGregory Neil Shapiro 
744d0cef73dSGregory Neil Shapiro 		GET_TASK_READY_TO_RUN();
745d0cef73dSGregory Neil Shapiro 
746d0cef73dSGregory Neil Shapiro 		/* Got a task? */
747d0cef73dSGregory Neil Shapiro 		if (ctx != NULL)
748d0cef73dSGregory Neil Shapiro 		{
749d0cef73dSGregory Neil Shapiro 			TASKMGR_UNLOCK();
750d0cef73dSGregory Neil Shapiro 			continue;
751d0cef73dSGregory Neil Shapiro 		}
752d0cef73dSGregory Neil Shapiro 
753d0cef73dSGregory Neil Shapiro 		/*
754d0cef73dSGregory Neil Shapiro 		**  if not, let's check if there is enough idle workers
755d0cef73dSGregory Neil Shapiro 		**	if yes: quit
756d0cef73dSGregory Neil Shapiro 		*/
757d0cef73dSGregory Neil Shapiro 
758d0cef73dSGregory Neil Shapiro 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
759d0cef73dSGregory Neil Shapiro 		    Tskmgr.tm_nb_idle > MIN_IDLE)
760d0cef73dSGregory Neil Shapiro 			done = true;
761d0cef73dSGregory Neil Shapiro 
762d0cef73dSGregory Neil Shapiro 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
763d0cef73dSGregory Neil Shapiro 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
764d0cef73dSGregory Neil Shapiro 
765d0cef73dSGregory Neil Shapiro 		if (done)
766d0cef73dSGregory Neil Shapiro 		{
767d0cef73dSGregory Neil Shapiro 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
768d0cef73dSGregory Neil Shapiro 			Tskmgr.tm_nb_workers--;
769d0cef73dSGregory Neil Shapiro 			TASKMGR_UNLOCK();
770d0cef73dSGregory Neil Shapiro 			continue;
771d0cef73dSGregory Neil Shapiro 		}
772d0cef73dSGregory Neil Shapiro 
773d0cef73dSGregory Neil Shapiro 		/*
774d0cef73dSGregory Neil Shapiro 		**  if no task ready to run, wait for another one
775d0cef73dSGregory Neil Shapiro 		*/
776d0cef73dSGregory Neil Shapiro 
777d0cef73dSGregory Neil Shapiro 		Tskmgr.tm_nb_idle++;
778d0cef73dSGregory Neil Shapiro 		TASKMGR_COND_WAIT();
779d0cef73dSGregory Neil Shapiro 		Tskmgr.tm_nb_idle--;
780d0cef73dSGregory Neil Shapiro 
781d0cef73dSGregory Neil Shapiro 		/* look for a task */
782d0cef73dSGregory Neil Shapiro 		GET_TASK_READY_TO_RUN();
783d0cef73dSGregory Neil Shapiro 
784d0cef73dSGregory Neil Shapiro 		TASKMGR_UNLOCK();
785d0cef73dSGregory Neil Shapiro 	}
786d0cef73dSGregory Neil Shapiro 	return NULL;
787d0cef73dSGregory Neil Shapiro }
788d0cef73dSGregory Neil Shapiro 
789d0cef73dSGregory Neil Shapiro /*
790d0cef73dSGregory Neil Shapiro **  MI_LIST_ADD_CTX -- add new session to linked list
791d0cef73dSGregory Neil Shapiro **
792d0cef73dSGregory Neil Shapiro **	Parameters:
793d0cef73dSGregory Neil Shapiro **		ctx -- context structure
794d0cef73dSGregory Neil Shapiro **
795d0cef73dSGregory Neil Shapiro **	Returns:
796d0cef73dSGregory Neil Shapiro **		MI_FAILURE/MI_SUCCESS
797d0cef73dSGregory Neil Shapiro */
798d0cef73dSGregory Neil Shapiro 
799d0cef73dSGregory Neil Shapiro static int
mi_list_add_ctx(ctx)800d0cef73dSGregory Neil Shapiro mi_list_add_ctx(ctx)
801d0cef73dSGregory Neil Shapiro 	SMFICTX_PTR ctx;
802d0cef73dSGregory Neil Shapiro {
803d0cef73dSGregory Neil Shapiro 	SM_ASSERT(ctx != NULL);
804d0cef73dSGregory Neil Shapiro 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
805d0cef73dSGregory Neil Shapiro 	return MI_SUCCESS;
806d0cef73dSGregory Neil Shapiro }
807d0cef73dSGregory Neil Shapiro 
808d0cef73dSGregory Neil Shapiro /*
809d0cef73dSGregory Neil Shapiro **  MI_LIST_DEL_CTX -- remove session from linked list when finished
810d0cef73dSGregory Neil Shapiro **
811d0cef73dSGregory Neil Shapiro **	Parameters:
812d0cef73dSGregory Neil Shapiro **		ctx -- context structure
813d0cef73dSGregory Neil Shapiro **
814d0cef73dSGregory Neil Shapiro **	Returns:
815d0cef73dSGregory Neil Shapiro **		MI_FAILURE/MI_SUCCESS
816d0cef73dSGregory Neil Shapiro */
817d0cef73dSGregory Neil Shapiro 
818d0cef73dSGregory Neil Shapiro static int
mi_list_del_ctx(ctx)819d0cef73dSGregory Neil Shapiro mi_list_del_ctx(ctx)
820d0cef73dSGregory Neil Shapiro 	SMFICTX_PTR ctx;
821d0cef73dSGregory Neil Shapiro {
822d0cef73dSGregory Neil Shapiro 	SM_ASSERT(ctx != NULL);
823d0cef73dSGregory Neil Shapiro 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
824d0cef73dSGregory Neil Shapiro 		return MI_FAILURE;
825d0cef73dSGregory Neil Shapiro 
826d0cef73dSGregory Neil Shapiro 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
827d0cef73dSGregory Neil Shapiro 	return MI_SUCCESS;
828d0cef73dSGregory Neil Shapiro }
829d0cef73dSGregory Neil Shapiro #endif /* _FFR_WORKERS_POOL */
830