xref: /onnv-gate/usr/src/cmd/sendmail/libmilter/worker.c (revision 11440:802724e2906a)
13544Sjbeck /*
2*11440SJohn.Beck@Sun.COM  *  Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
33544Sjbeck  *	All rights reserved.
43544Sjbeck  *
53544Sjbeck  * By using this file, you agree to the terms and conditions set
63544Sjbeck  * forth in the LICENSE file which can be found at the top level of
73544Sjbeck  * the sendmail distribution.
83544Sjbeck  *
93544Sjbeck  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
103544Sjbeck  *   Jose-Marcio.Martins@ensmp.fr
113544Sjbeck  */
123544Sjbeck 
133544Sjbeck #include <sm/gen.h>
14*11440SJohn.Beck@Sun.COM SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")
153544Sjbeck 
163544Sjbeck #include "libmilter.h"
173544Sjbeck 
183544Sjbeck #if _FFR_WORKERS_POOL
193544Sjbeck 
203544Sjbeck typedef struct taskmgr_S taskmgr_T;
213544Sjbeck 
223544Sjbeck #define TM_SIGNATURE		0x23021957
233544Sjbeck 
243544Sjbeck struct taskmgr_S
253544Sjbeck {
263544Sjbeck 	long		tm_signature; /* has the controller been initialized */
273544Sjbeck 	sthread_t	tm_tid;	/* thread id of controller */
283544Sjbeck 	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
293544Sjbeck 
303544Sjbeck 	int		tm_nb_workers;	/* number of workers in the pool */
313544Sjbeck 	int		tm_nb_idle;	/* number of workers waiting */
323544Sjbeck 
333544Sjbeck 	int		tm_p[2];	/* poll control pipe */
343544Sjbeck 
353544Sjbeck 	smutex_t	tm_w_mutex;	/* linked list access mutex */
363544Sjbeck 	scond_t		tm_w_cond;	/* */
373544Sjbeck };
383544Sjbeck 
393544Sjbeck static taskmgr_T     Tskmgr = {0};
403544Sjbeck 
413544Sjbeck #define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
423544Sjbeck 
433544Sjbeck #define RD_PIPE	(Tskmgr.tm_p[0])
443544Sjbeck #define WR_PIPE	(Tskmgr.tm_p[1])
453544Sjbeck 
463544Sjbeck #define PIPE_SEND_SIGNAL()						\
473544Sjbeck 	do								\
483544Sjbeck 	{								\
493544Sjbeck 		char evt = 0x5a;					\
503544Sjbeck 		int fd = WR_PIPE;					\
513544Sjbeck 		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
523544Sjbeck 			smi_log(SMI_LOG_ERR,				\
533544Sjbeck 				"Error writing to event pipe: %s",	\
543544Sjbeck 				sm_errstring(errno));			\
553544Sjbeck 	} while (0)
563544Sjbeck 
573544Sjbeck #ifndef USE_PIPE_WAKE_POLL
583544Sjbeck # define USE_PIPE_WAKE_POLL 1
593544Sjbeck #endif /* USE_PIPE_WAKE_POLL */
603544Sjbeck 
613544Sjbeck /* poll check periodicity (default 10000 - 10 s) */
623544Sjbeck #define POLL_TIMEOUT   10000
633544Sjbeck 
643544Sjbeck /* worker conditional wait timeout (default 10 s) */
653544Sjbeck #define COND_TIMEOUT     10
663544Sjbeck 
673544Sjbeck /* functions */
683544Sjbeck static int mi_close_session __P((SMFICTX_PTR));
693544Sjbeck 
703544Sjbeck static void *mi_worker __P((void *));
713544Sjbeck static void *mi_pool_controller __P((void *));
723544Sjbeck 
733544Sjbeck static int mi_list_add_ctx __P((SMFICTX_PTR));
743544Sjbeck static int mi_list_del_ctx __P((SMFICTX_PTR));
753544Sjbeck 
763544Sjbeck /*
773544Sjbeck **  periodicity of cleaning up old sessions (timedout)
783544Sjbeck **	sessions list will be checked to find old inactive
793544Sjbeck **	sessions each DT_CHECK_OLD_SESSIONS sec
803544Sjbeck */
813544Sjbeck 
823544Sjbeck #define DT_CHECK_OLD_SESSIONS   600
833544Sjbeck 
843544Sjbeck #ifndef OLD_SESSION_TIMEOUT
853544Sjbeck # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
863544Sjbeck #endif /* OLD_SESSION_TIMEOUT */
873544Sjbeck 
883544Sjbeck /* session states - with respect to the pool of workers */
893544Sjbeck #define WKST_INIT		0	/* initial state */
903544Sjbeck #define WKST_READY_TO_RUN	1	/* command ready do be read */
913544Sjbeck #define WKST_RUNNING		2	/* session running on a worker */
923544Sjbeck #define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
933544Sjbeck #define WKST_WAITING		4	/* waiting for new command */
943544Sjbeck #define WKST_CLOSING		5	/* session finished */
953544Sjbeck 
963544Sjbeck #ifndef MIN_WORKERS
973544Sjbeck # define MIN_WORKERS	2  /* minimum number of threads to keep around */
983544Sjbeck #endif
993544Sjbeck 
1003544Sjbeck #define MIN_IDLE	1  /* minimum number of idle threads */
1013544Sjbeck 
1023544Sjbeck 
1033544Sjbeck /*
1043544Sjbeck **  Macros for threads and mutex management
1053544Sjbeck */
1063544Sjbeck 
1073544Sjbeck #define TASKMGR_LOCK()							\
1083544Sjbeck 	do								\
1093544Sjbeck 	{								\
1103544Sjbeck 		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
1113544Sjbeck 			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
1123544Sjbeck 	} while (0)
1133544Sjbeck 
1143544Sjbeck #define TASKMGR_UNLOCK()						\
1153544Sjbeck 	do								\
1163544Sjbeck 	{								\
1173544Sjbeck 		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
1183544Sjbeck 			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
1193544Sjbeck 	} while (0)
1203544Sjbeck 
1213544Sjbeck #define	TASKMGR_COND_WAIT()						\
1223544Sjbeck 	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
1233544Sjbeck 
1243544Sjbeck #define	TASKMGR_COND_SIGNAL()						\
1253544Sjbeck 	do								\
1263544Sjbeck 	{								\
1273544Sjbeck 		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
1283544Sjbeck 			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
1293544Sjbeck 	} while (0)
1303544Sjbeck 
1313544Sjbeck #define LAUNCH_WORKER(ctx)						\
1323544Sjbeck 	do								\
1333544Sjbeck 	{								\
1343544Sjbeck 		int r;							\
1353544Sjbeck 		sthread_t tid;						\
1363544Sjbeck 									\
1373544Sjbeck 		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
1383544Sjbeck 			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
1393544Sjbeck 				sm_errstring(r));			\
1403544Sjbeck 	} while (0)
1413544Sjbeck 
1423544Sjbeck #if POOL_DEBUG
1433544Sjbeck # define POOL_LEV_DPRINTF(lev, x)					\
1443544Sjbeck 	do {								\
1453544Sjbeck 		if ((lev) < ctx->ctx_dbg)				\
1463544Sjbeck 			sm_dprintf x;					\
1473544Sjbeck 	} while (0)
1483544Sjbeck #else /* POOL_DEBUG */
1493544Sjbeck # define POOL_LEV_DPRINTF(lev, x)
1503544Sjbeck #endif /* POOL_DEBUG */
1513544Sjbeck 
1523544Sjbeck /*
1533544Sjbeck **  MI_START_SESSION -- Start a session in the pool of workers
1543544Sjbeck **
1553544Sjbeck **	Parameters:
1563544Sjbeck **		ctx -- context structure
1573544Sjbeck **
1583544Sjbeck **	Returns:
1593544Sjbeck **		MI_SUCCESS/MI_FAILURE
1603544Sjbeck */
1613544Sjbeck 
1623544Sjbeck int
mi_start_session(ctx)1633544Sjbeck mi_start_session(ctx)
1643544Sjbeck 	SMFICTX_PTR ctx;
1653544Sjbeck {
1663544Sjbeck 	static long id = 0;
1673544Sjbeck 
1683544Sjbeck 	SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
1693544Sjbeck 	SM_ASSERT(ctx != NULL);
1703544Sjbeck 	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
1713544Sjbeck 	TASKMGR_LOCK();
1723544Sjbeck 
1733544Sjbeck 	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
1743544Sjbeck 	{
1753544Sjbeck 		TASKMGR_UNLOCK();
1763544Sjbeck 		return MI_FAILURE;
1773544Sjbeck 	}
1783544Sjbeck 
1793544Sjbeck 	ctx->ctx_sid = id++;
1803544Sjbeck 
1813544Sjbeck 	/* if there is an idle worker, signal it, otherwise start new worker */
1823544Sjbeck 	if (Tskmgr.tm_nb_idle > 0)
1833544Sjbeck 	{
1843544Sjbeck 		ctx->ctx_wstate = WKST_READY_TO_RUN;
1853544Sjbeck 		TASKMGR_COND_SIGNAL();
1863544Sjbeck 	}
1873544Sjbeck 	else
1883544Sjbeck 	{
1893544Sjbeck 		ctx->ctx_wstate = WKST_RUNNING;
1903544Sjbeck 		LAUNCH_WORKER(ctx);
1913544Sjbeck 	}
1923544Sjbeck 	TASKMGR_UNLOCK();
1933544Sjbeck 	return MI_SUCCESS;
1943544Sjbeck }
1953544Sjbeck 
1963544Sjbeck /*
1973544Sjbeck **  MI_CLOSE_SESSION -- Close a session and clean up data structures
1983544Sjbeck **
1993544Sjbeck **	Parameters:
2003544Sjbeck **		ctx -- context structure
2013544Sjbeck **
2023544Sjbeck **	Returns:
2033544Sjbeck **		MI_SUCCESS/MI_FAILURE
2043544Sjbeck */
2053544Sjbeck 
2063544Sjbeck static int
mi_close_session(ctx)2073544Sjbeck mi_close_session(ctx)
2083544Sjbeck 	SMFICTX_PTR ctx;
2093544Sjbeck {
2103544Sjbeck 	SM_ASSERT(ctx != NULL);
2113544Sjbeck 
2123544Sjbeck 	(void) mi_list_del_ctx(ctx);
213*11440SJohn.Beck@Sun.COM 	mi_clr_ctx(ctx);
2143544Sjbeck 
2153544Sjbeck 	return MI_SUCCESS;
2163544Sjbeck }
2173544Sjbeck 
2183544Sjbeck /*
2193544Sjbeck **  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
2203544Sjbeck **		Must be called before starting sessions.
2213544Sjbeck **
2223544Sjbeck **	Parameters:
2233544Sjbeck **		none
2243544Sjbeck **
2253544Sjbeck **	Returns:
2263544Sjbeck **		MI_SUCCESS/MI_FAILURE
2273544Sjbeck */
2283544Sjbeck 
2293544Sjbeck int
mi_pool_controller_init()2303544Sjbeck mi_pool_controller_init()
2313544Sjbeck {
2323544Sjbeck 	sthread_t tid;
2333544Sjbeck 	int r, i;
2343544Sjbeck 
2353544Sjbeck 	if (Tskmgr.tm_signature == TM_SIGNATURE)
2363544Sjbeck 		return MI_SUCCESS;
2373544Sjbeck 
2383544Sjbeck 	SM_TAILQ_INIT(&WRK_CTX_HEAD);
2393544Sjbeck 	Tskmgr.tm_tid = (sthread_t) -1;
2403544Sjbeck 	Tskmgr.tm_nb_workers = 0;
2413544Sjbeck 	Tskmgr.tm_nb_idle = 0;
2423544Sjbeck 
2433544Sjbeck 	if (pipe(Tskmgr.tm_p) != 0)
2443544Sjbeck 	{
2453544Sjbeck 		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
246*11440SJohn.Beck@Sun.COM 			sm_errstring(errno));
2473544Sjbeck 		return MI_FAILURE;
2483544Sjbeck 	}
2493544Sjbeck 
2503544Sjbeck 	(void) smutex_init(&Tskmgr.tm_w_mutex);
2513544Sjbeck 	(void) scond_init(&Tskmgr.tm_w_cond);
2523544Sjbeck 
2533544Sjbeck 	/* Launch the pool controller */
2543544Sjbeck 	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
2553544Sjbeck 	{
2563544Sjbeck 		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
2573544Sjbeck 			sm_errstring(r));
2583544Sjbeck 		return MI_FAILURE;
2593544Sjbeck 	}
2603544Sjbeck 	Tskmgr.tm_tid = tid;
2613544Sjbeck 	Tskmgr.tm_signature = TM_SIGNATURE;
2623544Sjbeck 
2633544Sjbeck 	/* Create the pool of workers */
2643544Sjbeck 	for (i = 0; i < MIN_WORKERS; i++)
2653544Sjbeck 	{
2663544Sjbeck 		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
2673544Sjbeck 		{
2683544Sjbeck 			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
2693544Sjbeck 				sm_errstring(r));
2703544Sjbeck 			return MI_FAILURE;
2713544Sjbeck 		}
2723544Sjbeck 	}
2733544Sjbeck 
2743544Sjbeck 	return MI_SUCCESS;
2753544Sjbeck }
2763544Sjbeck 
2773544Sjbeck /*
2783544Sjbeck **  MI_POOL_CONTROLLER -- manage the pool of workers
2793544Sjbeck **	This thread must be running when listener begins
2803544Sjbeck **	starting sessions
2813544Sjbeck **
2823544Sjbeck **	Parameters:
2833544Sjbeck **		arg -- unused
2843544Sjbeck **
2853544Sjbeck **	Returns:
2863544Sjbeck **		NULL
2873544Sjbeck **
2883544Sjbeck **	Control flow:
2893544Sjbeck **		for (;;)
2903544Sjbeck **			Look for timed out sessions
2913544Sjbeck **			Select sessions to wait for sendmail command
2923544Sjbeck **			Poll set of file descriptors
2933544Sjbeck **			if timeout
2943544Sjbeck **				continue
2953544Sjbeck **			For each file descriptor ready
2963544Sjbeck **				launch new thread if no worker available
2973544Sjbeck **				else
2983544Sjbeck **				signal waiting worker
2993544Sjbeck */
3003544Sjbeck 
3013544Sjbeck /* Poll structure array (pollfd) size step */
3023544Sjbeck #define PFD_STEP	256
3033544Sjbeck 
3043544Sjbeck #define WAIT_FD(i)	(pfd[i].fd)
3053544Sjbeck #define WAITFN		"POLL"
3063544Sjbeck 
3073544Sjbeck static void *
mi_pool_controller(arg)3083544Sjbeck mi_pool_controller(arg)
3093544Sjbeck 	void *arg;
3103544Sjbeck {
3113544Sjbeck 	struct pollfd *pfd = NULL;
3123544Sjbeck 	int dim_pfd = 0;
3133544Sjbeck 	bool rebuild_set = true;
3143544Sjbeck 	int pcnt = 0; /* error count for poll() failures */
315*11440SJohn.Beck@Sun.COM 	time_t lastcheck;
3163544Sjbeck 
3173544Sjbeck 	Tskmgr.tm_tid = sthread_get_id();
3183544Sjbeck 	if (pthread_detach(Tskmgr.tm_tid) != 0)
3193544Sjbeck 	{
3203544Sjbeck 		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
3213544Sjbeck 		return NULL;
3223544Sjbeck 	}
3233544Sjbeck 
3243544Sjbeck 	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
3253544Sjbeck 	if (pfd == NULL)
3263544Sjbeck 	{
3273544Sjbeck 		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
3283544Sjbeck 			sm_errstring(errno));
3293544Sjbeck 		return NULL;
3303544Sjbeck 	}
3313544Sjbeck 	dim_pfd = PFD_STEP;
3323544Sjbeck 
333*11440SJohn.Beck@Sun.COM 	lastcheck = time(NULL);
3343544Sjbeck 	for (;;)
3353544Sjbeck 	{
3363544Sjbeck 		SMFICTX_PTR ctx;
3373544Sjbeck 		int nfd, rfd, i;
3383544Sjbeck 		time_t now;
3393544Sjbeck 
3403544Sjbeck 		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
3413544Sjbeck 
3423544Sjbeck 		if (mi_stop() != MILTER_CONT)
3433544Sjbeck 			break;
3443544Sjbeck 
3453544Sjbeck 		TASKMGR_LOCK();
3463544Sjbeck 
3473544Sjbeck 		now = time(NULL);
3483544Sjbeck 
3493544Sjbeck 		/* check for timed out sessions? */
3503544Sjbeck 		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
3513544Sjbeck 		{
352*11440SJohn.Beck@Sun.COM 			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
353*11440SJohn.Beck@Sun.COM 			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
3543544Sjbeck 			{
355*11440SJohn.Beck@Sun.COM 				SMFICTX_PTR ctx_nxt;
356*11440SJohn.Beck@Sun.COM 
357*11440SJohn.Beck@Sun.COM 				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
3583544Sjbeck 				if (ctx->ctx_wstate == WKST_WAITING)
3593544Sjbeck 				{
3603544Sjbeck 					if (ctx->ctx_wait == 0)
361*11440SJohn.Beck@Sun.COM 						ctx->ctx_wait = now;
362*11440SJohn.Beck@Sun.COM 					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
363*11440SJohn.Beck@Sun.COM 						 < now)
3643544Sjbeck 					{
365*11440SJohn.Beck@Sun.COM 						/* if session timed out, close it */
3663544Sjbeck 						sfsistat (*fi_close) __P((SMFICTX *));
3673544Sjbeck 
3683544Sjbeck 						POOL_LEV_DPRINTF(4,
3693544Sjbeck 							("Closing old connection: sd=%d id=%d",
3703544Sjbeck 							ctx->ctx_sd,
3713544Sjbeck 							ctx->ctx_sid));
3723544Sjbeck 
3733544Sjbeck 						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
3743544Sjbeck 							(void) (*fi_close)(ctx);
3753544Sjbeck 
3763544Sjbeck 						mi_close_session(ctx);
3773544Sjbeck 					}
3783544Sjbeck 				}
379*11440SJohn.Beck@Sun.COM 				ctx = ctx_nxt;
3803544Sjbeck 			}
3813544Sjbeck 			lastcheck = now;
3823544Sjbeck 		}
3833544Sjbeck 
3843544Sjbeck 		if (rebuild_set)
3853544Sjbeck 		{
3863544Sjbeck 			/*
3873544Sjbeck 			**  Initialize poll set.
3883544Sjbeck 			**  Insert into the poll set the file descriptors of
3893544Sjbeck 			**  all sessions waiting for a command from sendmail.
3903544Sjbeck 			*/
3913544Sjbeck 
3923544Sjbeck 			nfd = 0;
3933544Sjbeck 
3943544Sjbeck 			/* begin with worker pipe */
3953544Sjbeck 			pfd[nfd].fd = RD_PIPE;
3963544Sjbeck 			pfd[nfd].events = MI_POLL_RD_FLAGS;
3973544Sjbeck 			pfd[nfd].revents = 0;
3983544Sjbeck 			nfd++;
3993544Sjbeck 
4003544Sjbeck 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
4013544Sjbeck 			{
4023544Sjbeck 				/*
4033544Sjbeck 				**  update ctx_wait - start of wait moment -
4043544Sjbeck 				**  for timeout
4053544Sjbeck 				*/
4063544Sjbeck 
4073544Sjbeck 				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
4083544Sjbeck 					ctx->ctx_wait = now;
4093544Sjbeck 
4103544Sjbeck 				/* add the session to the pollfd array? */
4113544Sjbeck 				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
4123544Sjbeck 				    (ctx->ctx_wstate == WKST_WAITING))
4133544Sjbeck 				{
4143544Sjbeck 					/*
4153544Sjbeck 					**  Resize the pollfd array if it
4163544Sjbeck 					**  isn't large enough.
4173544Sjbeck 					*/
4183544Sjbeck 
4193544Sjbeck 					if (nfd >= dim_pfd)
4203544Sjbeck 					{
4213544Sjbeck 						struct pollfd *tpfd;
4223544Sjbeck 						size_t new;
4233544Sjbeck 
4243544Sjbeck 						new = (dim_pfd + PFD_STEP) *
4253544Sjbeck 							sizeof(*tpfd);
4263544Sjbeck 						tpfd = (struct pollfd *)
4273544Sjbeck 							realloc(pfd, new);
4283544Sjbeck 						if (tpfd != NULL)
4293544Sjbeck 						{
4303544Sjbeck 							pfd = tpfd;
4313544Sjbeck 							dim_pfd += PFD_STEP;
4323544Sjbeck 						}
4333544Sjbeck 						else
4343544Sjbeck 						{
4353544Sjbeck 							smi_log(SMI_LOG_ERR,
4363544Sjbeck 								"Failed to realloc pollfd array:%s",
4373544Sjbeck 								sm_errstring(errno));
4383544Sjbeck 						}
4393544Sjbeck 					}
4403544Sjbeck 
4413544Sjbeck 					/* add the session to pollfd array */
4423544Sjbeck 					if (nfd < dim_pfd)
4433544Sjbeck 					{
4443544Sjbeck 						ctx->ctx_wstate = WKST_WAITING;
4453544Sjbeck 						pfd[nfd].fd = ctx->ctx_sd;
4463544Sjbeck 						pfd[nfd].events = MI_POLL_RD_FLAGS;
4473544Sjbeck 						pfd[nfd].revents = 0;
4483544Sjbeck 						nfd++;
4493544Sjbeck 					}
4503544Sjbeck 				}
4513544Sjbeck 			}
452*11440SJohn.Beck@Sun.COM 			rebuild_set = false;
4533544Sjbeck 		}
4543544Sjbeck 
4553544Sjbeck 		TASKMGR_UNLOCK();
4563544Sjbeck 
4573544Sjbeck 		/* Everything is ready, let's wait for an event */
4583544Sjbeck 		rfd = poll(pfd, nfd, POLL_TIMEOUT);
4593544Sjbeck 
4603544Sjbeck 		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
4613544Sjbeck 			WAITFN, now, nfd));
4623544Sjbeck 
4633544Sjbeck 		/* timeout */
4643544Sjbeck 		if (rfd == 0)
4653544Sjbeck 			continue;
4663544Sjbeck 
4673544Sjbeck 		rebuild_set = true;
4683544Sjbeck 
4693544Sjbeck 		/* error */
4703544Sjbeck 		if (rfd < 0)
4713544Sjbeck 		{
4723544Sjbeck 			if (errno == EINTR)
4733544Sjbeck 				continue;
4743544Sjbeck 			pcnt++;
4753544Sjbeck 			smi_log(SMI_LOG_ERR,
4763544Sjbeck 				"%s() failed (%s), %s",
4773544Sjbeck 				WAITFN, sm_errstring(errno),
4783544Sjbeck 				pcnt >= MAX_FAILS_S ? "abort" : "try again");
4793544Sjbeck 
4803544Sjbeck 			if (pcnt >= MAX_FAILS_S)
4813544Sjbeck 				goto err;
4823544Sjbeck 		}
4833544Sjbeck 		pcnt = 0;
4843544Sjbeck 
4853544Sjbeck 		/* something happened */
4863544Sjbeck 		for (i = 0; i < nfd; i++)
4873544Sjbeck 		{
4883544Sjbeck 			if (pfd[i].revents == 0)
4893544Sjbeck 				continue;
4903544Sjbeck 
4913544Sjbeck 			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
4923544Sjbeck 				WAITFN, i, nfd,
4933544Sjbeck 			WAIT_FD(i)));
4943544Sjbeck 
4953544Sjbeck 			/* has a worker signaled an end of task ? */
4963544Sjbeck 			if (WAIT_FD(i) == RD_PIPE)
4973544Sjbeck 			{
4983544Sjbeck 				char evt = 0;
4993544Sjbeck 				int r = 0;
5003544Sjbeck 
5013544Sjbeck 				POOL_LEV_DPRINTF(4,
5023544Sjbeck 					("PIPE WILL READ evt = %08X %08X",
5033544Sjbeck 					pfd[i].events, pfd[i].revents));
5043544Sjbeck 
5053544Sjbeck 				if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
5063544Sjbeck 				{
5073544Sjbeck 					r = read(RD_PIPE, &evt, sizeof(evt));
5083544Sjbeck 					if (r == sizeof(evt))
5093544Sjbeck 					{
5103544Sjbeck 						/* Do nothing */
5113544Sjbeck 					}
5123544Sjbeck 				}
5133544Sjbeck 
5143544Sjbeck 				POOL_LEV_DPRINTF(4,
5153544Sjbeck 					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
5163544Sjbeck 					i, RD_PIPE, r, evt));
5173544Sjbeck 
5183544Sjbeck 				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
5193544Sjbeck 				{
5203544Sjbeck 					/* Exception handling */
5213544Sjbeck 				}
5223544Sjbeck 				continue;
5233544Sjbeck 			}
5243544Sjbeck 
5253544Sjbeck 			/* no ! sendmail wants to send a command */
5263544Sjbeck 			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
5273544Sjbeck 			{
5283544Sjbeck 				if (ctx->ctx_wstate != WKST_WAITING)
5293544Sjbeck 					continue;
5303544Sjbeck 
5313544Sjbeck 				POOL_LEV_DPRINTF(4,
5323544Sjbeck 					("Checking context sd=%d - fd=%d ",
5333544Sjbeck 					ctx->ctx_sd , WAIT_FD(i)));
5343544Sjbeck 
5353544Sjbeck 				if (ctx->ctx_sd == pfd[i].fd)
5363544Sjbeck 				{
5373544Sjbeck 					TASKMGR_LOCK();
5383544Sjbeck 
5393544Sjbeck 					POOL_LEV_DPRINTF(4,
5403544Sjbeck 						("TASK: found %d for fd[%d]=%d",
5413544Sjbeck 						ctx->ctx_sid, i, WAIT_FD(i)));
5423544Sjbeck 
5433544Sjbeck 					if (Tskmgr.tm_nb_idle > 0)
5443544Sjbeck 					{
5453544Sjbeck 						ctx->ctx_wstate = WKST_READY_TO_RUN;
5463544Sjbeck 						TASKMGR_COND_SIGNAL();
5473544Sjbeck 					}
5483544Sjbeck 					else
5493544Sjbeck 					{
5503544Sjbeck 						ctx->ctx_wstate = WKST_RUNNING;
5513544Sjbeck 						LAUNCH_WORKER(ctx);
5523544Sjbeck 					}
5533544Sjbeck 					TASKMGR_UNLOCK();
5543544Sjbeck 					break;
5553544Sjbeck 				}
5563544Sjbeck 			}
5573544Sjbeck 
5583544Sjbeck 			POOL_LEV_DPRINTF(4,
5593544Sjbeck 				("TASK %s FOUND - Checking PIPE for fd[%d]",
5603544Sjbeck 				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
5613544Sjbeck 		}
5623544Sjbeck 	}
5633544Sjbeck 
5643544Sjbeck   err:
5653544Sjbeck 	if (pfd != NULL)
5663544Sjbeck 		free(pfd);
5673544Sjbeck 
5683544Sjbeck 	Tskmgr.tm_signature = 0;
5693544Sjbeck 	for (;;)
5703544Sjbeck 	{
5713544Sjbeck 		SMFICTX_PTR ctx;
5723544Sjbeck 
5733544Sjbeck 		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
5743544Sjbeck 		if (ctx == NULL)
5753544Sjbeck 			break;
5763544Sjbeck 		mi_close_session(ctx);
5773544Sjbeck 	}
5783544Sjbeck 
5793544Sjbeck 	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
5803544Sjbeck 	(void) scond_destroy(&Tskmgr.tm_w_cond);
5813544Sjbeck 
5823544Sjbeck 	return NULL;
5833544Sjbeck }
5843544Sjbeck 
5853544Sjbeck /*
5863544Sjbeck **  Look for a task ready to run.
5873544Sjbeck **  Value of ctx is NULL or a pointer to a task ready to run.
5883544Sjbeck */
5893544Sjbeck 
5903544Sjbeck #define GET_TASK_READY_TO_RUN()					\
5913544Sjbeck 	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
5923544Sjbeck 	{							\
5933544Sjbeck 		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
5943544Sjbeck 		{						\
5953544Sjbeck 			ctx->ctx_wstate = WKST_RUNNING;		\
5963544Sjbeck 			break;					\
5973544Sjbeck 		}						\
5983544Sjbeck 	}
5993544Sjbeck 
6003544Sjbeck /*
6013544Sjbeck **  MI_WORKER -- worker thread
6023544Sjbeck **	executes tasks distributed by the mi_pool_controller
6033544Sjbeck **	or by mi_start_session
6043544Sjbeck **
6053544Sjbeck **	Parameters:
6063544Sjbeck **		arg -- pointer to context structure
6073544Sjbeck **
6083544Sjbeck **	Returns:
6093544Sjbeck **		NULL pointer
6103544Sjbeck */
6113544Sjbeck 
6123544Sjbeck static void *
mi_worker(arg)6133544Sjbeck mi_worker(arg)
6143544Sjbeck 	void *arg;
6153544Sjbeck {
6163544Sjbeck 	SMFICTX_PTR ctx;
6173544Sjbeck 	bool done;
6183544Sjbeck 	sthread_t t_id;
6193544Sjbeck 	int r;
6203544Sjbeck 
6213544Sjbeck 	ctx = (SMFICTX_PTR) arg;
6223544Sjbeck 	done = false;
6233544Sjbeck 	if (ctx != NULL)
6243544Sjbeck 		ctx->ctx_wstate = WKST_RUNNING;
6253544Sjbeck 
6263544Sjbeck 	t_id = sthread_get_id();
6273544Sjbeck 	if (pthread_detach(t_id) != 0)
6283544Sjbeck 	{
6293544Sjbeck 		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
6303544Sjbeck 		if (ctx != NULL)
6313544Sjbeck 			ctx->ctx_wstate = WKST_READY_TO_RUN;
6323544Sjbeck 		return NULL;
6333544Sjbeck 	}
6343544Sjbeck 
6353544Sjbeck 	TASKMGR_LOCK();
6363544Sjbeck 	Tskmgr.tm_nb_workers++;
6373544Sjbeck 	TASKMGR_UNLOCK();
6383544Sjbeck 
6393544Sjbeck 	while (!done)
6403544Sjbeck 	{
6413544Sjbeck 		if (mi_stop() != MILTER_CONT)
6423544Sjbeck 			break;
6433544Sjbeck 
6443544Sjbeck 		/* let's handle next task... */
6453544Sjbeck 		if (ctx != NULL)
6463544Sjbeck 		{
6473544Sjbeck 			int res;
6483544Sjbeck 
6493544Sjbeck 			POOL_LEV_DPRINTF(4,
6503544Sjbeck 				("worker %d: new task -> let's handle it",
6513544Sjbeck 				t_id));
6523544Sjbeck 			res = mi_engine(ctx);
6533544Sjbeck 			POOL_LEV_DPRINTF(4,
6543544Sjbeck 				("worker %d: mi_engine returned %d", t_id, res));
6553544Sjbeck 
6563544Sjbeck 			TASKMGR_LOCK();
6573544Sjbeck 			if (res != MI_CONTINUE)
6583544Sjbeck 			{
6593544Sjbeck 				ctx->ctx_wstate = WKST_CLOSING;
6603544Sjbeck 
6613544Sjbeck 				/*
6623544Sjbeck 				**  Delete context from linked list of
6633544Sjbeck 				**  sessions and close session.
6643544Sjbeck 				*/
6653544Sjbeck 
6663544Sjbeck 				mi_close_session(ctx);
6673544Sjbeck 			}
6683544Sjbeck 			else
6693544Sjbeck 			{
6703544Sjbeck 				ctx->ctx_wstate = WKST_READY_TO_WAIT;
6713544Sjbeck 
6723544Sjbeck 				POOL_LEV_DPRINTF(4,
6733544Sjbeck 					("writing to event pipe..."));
6743544Sjbeck 
6753544Sjbeck 				/*
6763544Sjbeck 				**  Signal task controller to add new session
6773544Sjbeck 				**  to poll set.
6783544Sjbeck 				*/
6793544Sjbeck 
6803544Sjbeck 				PIPE_SEND_SIGNAL();
6813544Sjbeck 			}
6823544Sjbeck 			TASKMGR_UNLOCK();
6833544Sjbeck 			ctx = NULL;
6843544Sjbeck 
6853544Sjbeck 		}
6863544Sjbeck 
6873544Sjbeck 		/* check if there is any task waiting to be served */
6883544Sjbeck 		TASKMGR_LOCK();
6893544Sjbeck 
6903544Sjbeck 		GET_TASK_READY_TO_RUN();
6913544Sjbeck 
6923544Sjbeck 		/* Got a task? */
6933544Sjbeck 		if (ctx != NULL)
6943544Sjbeck 		{
6953544Sjbeck 			TASKMGR_UNLOCK();
6963544Sjbeck 			continue;
6973544Sjbeck 		}
6983544Sjbeck 
6993544Sjbeck 		/*
7003544Sjbeck 		**  if not, let's check if there is enough idle workers
7013544Sjbeck 		**	if yes: quit
7023544Sjbeck 		*/
7033544Sjbeck 
7043544Sjbeck 		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
7053544Sjbeck 		    Tskmgr.tm_nb_idle > MIN_IDLE)
7063544Sjbeck 			done = true;
7073544Sjbeck 
7083544Sjbeck 		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
7093544Sjbeck 			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
7103544Sjbeck 
7113544Sjbeck 		if (done)
7123544Sjbeck 		{
7133544Sjbeck 			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
7143544Sjbeck 			Tskmgr.tm_nb_workers--;
7153544Sjbeck 			TASKMGR_UNLOCK();
7163544Sjbeck 			continue;
7173544Sjbeck 		}
7183544Sjbeck 
7193544Sjbeck 		/*
7203544Sjbeck 		**  if no task ready to run, wait for another one
7213544Sjbeck 		*/
7223544Sjbeck 
7233544Sjbeck 		Tskmgr.tm_nb_idle++;
7243544Sjbeck 		TASKMGR_COND_WAIT();
7253544Sjbeck 		Tskmgr.tm_nb_idle--;
7263544Sjbeck 
7273544Sjbeck 		/* look for a task */
7283544Sjbeck 		GET_TASK_READY_TO_RUN();
7293544Sjbeck 
7303544Sjbeck 		TASKMGR_UNLOCK();
7313544Sjbeck 	}
7323544Sjbeck 	return NULL;
7333544Sjbeck }
7343544Sjbeck 
7353544Sjbeck /*
7363544Sjbeck **  MI_LIST_ADD_CTX -- add new session to linked list
7373544Sjbeck **
7383544Sjbeck **	Parameters:
7393544Sjbeck **		ctx -- context structure
7403544Sjbeck **
7413544Sjbeck **	Returns:
7423544Sjbeck **		MI_FAILURE/MI_SUCCESS
7433544Sjbeck */
7443544Sjbeck 
7453544Sjbeck static int
mi_list_add_ctx(ctx)7463544Sjbeck mi_list_add_ctx(ctx)
7473544Sjbeck 	SMFICTX_PTR ctx;
7483544Sjbeck {
7493544Sjbeck 	SM_ASSERT(ctx != NULL);
7503544Sjbeck 	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
7513544Sjbeck 	return MI_SUCCESS;
7523544Sjbeck }
7533544Sjbeck 
7543544Sjbeck /*
7553544Sjbeck **  MI_LIST_DEL_CTX -- remove session from linked list when finished
7563544Sjbeck **
7573544Sjbeck **	Parameters:
7583544Sjbeck **		ctx -- context structure
7593544Sjbeck **
7603544Sjbeck **	Returns:
7613544Sjbeck **		MI_FAILURE/MI_SUCCESS
7623544Sjbeck */
7633544Sjbeck 
7643544Sjbeck static int
mi_list_del_ctx(ctx)7653544Sjbeck mi_list_del_ctx(ctx)
7663544Sjbeck 	SMFICTX_PTR ctx;
7673544Sjbeck {
7683544Sjbeck 	SM_ASSERT(ctx != NULL);
7693544Sjbeck 	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
7703544Sjbeck 		return MI_FAILURE;
7713544Sjbeck 
7723544Sjbeck 	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
7733544Sjbeck 	return MI_SUCCESS;
7743544Sjbeck }
7753544Sjbeck #endif /* _FFR_WORKERS_POOL */
776