13544Sjbeck /*
2*11440SJohn.Beck@Sun.COM * Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
33544Sjbeck * All rights reserved.
43544Sjbeck *
53544Sjbeck * By using this file, you agree to the terms and conditions set
63544Sjbeck * forth in the LICENSE file which can be found at the top level of
73544Sjbeck * the sendmail distribution.
83544Sjbeck *
93544Sjbeck * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
103544Sjbeck * Jose-Marcio.Martins@ensmp.fr
113544Sjbeck */
123544Sjbeck
133544Sjbeck #include <sm/gen.h>
14*11440SJohn.Beck@Sun.COM SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")
153544Sjbeck
163544Sjbeck #include "libmilter.h"
173544Sjbeck
183544Sjbeck #if _FFR_WORKERS_POOL
193544Sjbeck
203544Sjbeck typedef struct taskmgr_S taskmgr_T;
213544Sjbeck
223544Sjbeck #define TM_SIGNATURE 0x23021957
233544Sjbeck
243544Sjbeck struct taskmgr_S
253544Sjbeck {
263544Sjbeck long tm_signature; /* has the controller been initialized */
273544Sjbeck sthread_t tm_tid; /* thread id of controller */
283544Sjbeck smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
293544Sjbeck
303544Sjbeck int tm_nb_workers; /* number of workers in the pool */
313544Sjbeck int tm_nb_idle; /* number of workers waiting */
323544Sjbeck
333544Sjbeck int tm_p[2]; /* poll control pipe */
343544Sjbeck
353544Sjbeck smutex_t tm_w_mutex; /* linked list access mutex */
363544Sjbeck scond_t tm_w_cond; /* */
373544Sjbeck };
383544Sjbeck
393544Sjbeck static taskmgr_T Tskmgr = {0};
403544Sjbeck
413544Sjbeck #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
423544Sjbeck
433544Sjbeck #define RD_PIPE (Tskmgr.tm_p[0])
443544Sjbeck #define WR_PIPE (Tskmgr.tm_p[1])
453544Sjbeck
463544Sjbeck #define PIPE_SEND_SIGNAL() \
473544Sjbeck do \
483544Sjbeck { \
493544Sjbeck char evt = 0x5a; \
503544Sjbeck int fd = WR_PIPE; \
513544Sjbeck if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
523544Sjbeck smi_log(SMI_LOG_ERR, \
533544Sjbeck "Error writing to event pipe: %s", \
543544Sjbeck sm_errstring(errno)); \
553544Sjbeck } while (0)
563544Sjbeck
573544Sjbeck #ifndef USE_PIPE_WAKE_POLL
583544Sjbeck # define USE_PIPE_WAKE_POLL 1
593544Sjbeck #endif /* USE_PIPE_WAKE_POLL */
603544Sjbeck
613544Sjbeck /* poll check periodicity (default 10000 - 10 s) */
623544Sjbeck #define POLL_TIMEOUT 10000
633544Sjbeck
643544Sjbeck /* worker conditional wait timeout (default 10 s) */
653544Sjbeck #define COND_TIMEOUT 10
663544Sjbeck
673544Sjbeck /* functions */
683544Sjbeck static int mi_close_session __P((SMFICTX_PTR));
693544Sjbeck
703544Sjbeck static void *mi_worker __P((void *));
713544Sjbeck static void *mi_pool_controller __P((void *));
723544Sjbeck
733544Sjbeck static int mi_list_add_ctx __P((SMFICTX_PTR));
743544Sjbeck static int mi_list_del_ctx __P((SMFICTX_PTR));
753544Sjbeck
763544Sjbeck /*
773544Sjbeck ** periodicity of cleaning up old sessions (timedout)
783544Sjbeck ** sessions list will be checked to find old inactive
793544Sjbeck ** sessions each DT_CHECK_OLD_SESSIONS sec
803544Sjbeck */
813544Sjbeck
823544Sjbeck #define DT_CHECK_OLD_SESSIONS 600
833544Sjbeck
843544Sjbeck #ifndef OLD_SESSION_TIMEOUT
853544Sjbeck # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
863544Sjbeck #endif /* OLD_SESSION_TIMEOUT */
873544Sjbeck
883544Sjbeck /* session states - with respect to the pool of workers */
893544Sjbeck #define WKST_INIT 0 /* initial state */
903544Sjbeck #define WKST_READY_TO_RUN 1 /* command ready do be read */
913544Sjbeck #define WKST_RUNNING 2 /* session running on a worker */
923544Sjbeck #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
933544Sjbeck #define WKST_WAITING 4 /* waiting for new command */
943544Sjbeck #define WKST_CLOSING 5 /* session finished */
953544Sjbeck
963544Sjbeck #ifndef MIN_WORKERS
973544Sjbeck # define MIN_WORKERS 2 /* minimum number of threads to keep around */
983544Sjbeck #endif
993544Sjbeck
1003544Sjbeck #define MIN_IDLE 1 /* minimum number of idle threads */
1013544Sjbeck
1023544Sjbeck
1033544Sjbeck /*
1043544Sjbeck ** Macros for threads and mutex management
1053544Sjbeck */
1063544Sjbeck
1073544Sjbeck #define TASKMGR_LOCK() \
1083544Sjbeck do \
1093544Sjbeck { \
1103544Sjbeck if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
1113544Sjbeck smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
1123544Sjbeck } while (0)
1133544Sjbeck
1143544Sjbeck #define TASKMGR_UNLOCK() \
1153544Sjbeck do \
1163544Sjbeck { \
1173544Sjbeck if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
1183544Sjbeck smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
1193544Sjbeck } while (0)
1203544Sjbeck
1213544Sjbeck #define TASKMGR_COND_WAIT() \
1223544Sjbeck scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
1233544Sjbeck
1243544Sjbeck #define TASKMGR_COND_SIGNAL() \
1253544Sjbeck do \
1263544Sjbeck { \
1273544Sjbeck if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
1283544Sjbeck smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
1293544Sjbeck } while (0)
1303544Sjbeck
1313544Sjbeck #define LAUNCH_WORKER(ctx) \
1323544Sjbeck do \
1333544Sjbeck { \
1343544Sjbeck int r; \
1353544Sjbeck sthread_t tid; \
1363544Sjbeck \
1373544Sjbeck if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
1383544Sjbeck smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
1393544Sjbeck sm_errstring(r)); \
1403544Sjbeck } while (0)
1413544Sjbeck
1423544Sjbeck #if POOL_DEBUG
1433544Sjbeck # define POOL_LEV_DPRINTF(lev, x) \
1443544Sjbeck do { \
1453544Sjbeck if ((lev) < ctx->ctx_dbg) \
1463544Sjbeck sm_dprintf x; \
1473544Sjbeck } while (0)
1483544Sjbeck #else /* POOL_DEBUG */
1493544Sjbeck # define POOL_LEV_DPRINTF(lev, x)
1503544Sjbeck #endif /* POOL_DEBUG */
1513544Sjbeck
1523544Sjbeck /*
1533544Sjbeck ** MI_START_SESSION -- Start a session in the pool of workers
1543544Sjbeck **
1553544Sjbeck ** Parameters:
1563544Sjbeck ** ctx -- context structure
1573544Sjbeck **
1583544Sjbeck ** Returns:
1593544Sjbeck ** MI_SUCCESS/MI_FAILURE
1603544Sjbeck */
1613544Sjbeck
1623544Sjbeck int
mi_start_session(ctx)1633544Sjbeck mi_start_session(ctx)
1643544Sjbeck SMFICTX_PTR ctx;
1653544Sjbeck {
1663544Sjbeck static long id = 0;
1673544Sjbeck
1683544Sjbeck SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
1693544Sjbeck SM_ASSERT(ctx != NULL);
1703544Sjbeck POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
1713544Sjbeck TASKMGR_LOCK();
1723544Sjbeck
1733544Sjbeck if (mi_list_add_ctx(ctx) != MI_SUCCESS)
1743544Sjbeck {
1753544Sjbeck TASKMGR_UNLOCK();
1763544Sjbeck return MI_FAILURE;
1773544Sjbeck }
1783544Sjbeck
1793544Sjbeck ctx->ctx_sid = id++;
1803544Sjbeck
1813544Sjbeck /* if there is an idle worker, signal it, otherwise start new worker */
1823544Sjbeck if (Tskmgr.tm_nb_idle > 0)
1833544Sjbeck {
1843544Sjbeck ctx->ctx_wstate = WKST_READY_TO_RUN;
1853544Sjbeck TASKMGR_COND_SIGNAL();
1863544Sjbeck }
1873544Sjbeck else
1883544Sjbeck {
1893544Sjbeck ctx->ctx_wstate = WKST_RUNNING;
1903544Sjbeck LAUNCH_WORKER(ctx);
1913544Sjbeck }
1923544Sjbeck TASKMGR_UNLOCK();
1933544Sjbeck return MI_SUCCESS;
1943544Sjbeck }
1953544Sjbeck
1963544Sjbeck /*
1973544Sjbeck ** MI_CLOSE_SESSION -- Close a session and clean up data structures
1983544Sjbeck **
1993544Sjbeck ** Parameters:
2003544Sjbeck ** ctx -- context structure
2013544Sjbeck **
2023544Sjbeck ** Returns:
2033544Sjbeck ** MI_SUCCESS/MI_FAILURE
2043544Sjbeck */
2053544Sjbeck
2063544Sjbeck static int
mi_close_session(ctx)2073544Sjbeck mi_close_session(ctx)
2083544Sjbeck SMFICTX_PTR ctx;
2093544Sjbeck {
2103544Sjbeck SM_ASSERT(ctx != NULL);
2113544Sjbeck
2123544Sjbeck (void) mi_list_del_ctx(ctx);
213*11440SJohn.Beck@Sun.COM mi_clr_ctx(ctx);
2143544Sjbeck
2153544Sjbeck return MI_SUCCESS;
2163544Sjbeck }
2173544Sjbeck
2183544Sjbeck /*
2193544Sjbeck ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
2203544Sjbeck ** Must be called before starting sessions.
2213544Sjbeck **
2223544Sjbeck ** Parameters:
2233544Sjbeck ** none
2243544Sjbeck **
2253544Sjbeck ** Returns:
2263544Sjbeck ** MI_SUCCESS/MI_FAILURE
2273544Sjbeck */
2283544Sjbeck
2293544Sjbeck int
mi_pool_controller_init()2303544Sjbeck mi_pool_controller_init()
2313544Sjbeck {
2323544Sjbeck sthread_t tid;
2333544Sjbeck int r, i;
2343544Sjbeck
2353544Sjbeck if (Tskmgr.tm_signature == TM_SIGNATURE)
2363544Sjbeck return MI_SUCCESS;
2373544Sjbeck
2383544Sjbeck SM_TAILQ_INIT(&WRK_CTX_HEAD);
2393544Sjbeck Tskmgr.tm_tid = (sthread_t) -1;
2403544Sjbeck Tskmgr.tm_nb_workers = 0;
2413544Sjbeck Tskmgr.tm_nb_idle = 0;
2423544Sjbeck
2433544Sjbeck if (pipe(Tskmgr.tm_p) != 0)
2443544Sjbeck {
2453544Sjbeck smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
246*11440SJohn.Beck@Sun.COM sm_errstring(errno));
2473544Sjbeck return MI_FAILURE;
2483544Sjbeck }
2493544Sjbeck
2503544Sjbeck (void) smutex_init(&Tskmgr.tm_w_mutex);
2513544Sjbeck (void) scond_init(&Tskmgr.tm_w_cond);
2523544Sjbeck
2533544Sjbeck /* Launch the pool controller */
2543544Sjbeck if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
2553544Sjbeck {
2563544Sjbeck smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
2573544Sjbeck sm_errstring(r));
2583544Sjbeck return MI_FAILURE;
2593544Sjbeck }
2603544Sjbeck Tskmgr.tm_tid = tid;
2613544Sjbeck Tskmgr.tm_signature = TM_SIGNATURE;
2623544Sjbeck
2633544Sjbeck /* Create the pool of workers */
2643544Sjbeck for (i = 0; i < MIN_WORKERS; i++)
2653544Sjbeck {
2663544Sjbeck if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
2673544Sjbeck {
2683544Sjbeck smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
2693544Sjbeck sm_errstring(r));
2703544Sjbeck return MI_FAILURE;
2713544Sjbeck }
2723544Sjbeck }
2733544Sjbeck
2743544Sjbeck return MI_SUCCESS;
2753544Sjbeck }
2763544Sjbeck
2773544Sjbeck /*
2783544Sjbeck ** MI_POOL_CONTROLLER -- manage the pool of workers
2793544Sjbeck ** This thread must be running when listener begins
2803544Sjbeck ** starting sessions
2813544Sjbeck **
2823544Sjbeck ** Parameters:
2833544Sjbeck ** arg -- unused
2843544Sjbeck **
2853544Sjbeck ** Returns:
2863544Sjbeck ** NULL
2873544Sjbeck **
2883544Sjbeck ** Control flow:
2893544Sjbeck ** for (;;)
2903544Sjbeck ** Look for timed out sessions
2913544Sjbeck ** Select sessions to wait for sendmail command
2923544Sjbeck ** Poll set of file descriptors
2933544Sjbeck ** if timeout
2943544Sjbeck ** continue
2953544Sjbeck ** For each file descriptor ready
2963544Sjbeck ** launch new thread if no worker available
2973544Sjbeck ** else
2983544Sjbeck ** signal waiting worker
2993544Sjbeck */
3003544Sjbeck
3013544Sjbeck /* Poll structure array (pollfd) size step */
3023544Sjbeck #define PFD_STEP 256
3033544Sjbeck
3043544Sjbeck #define WAIT_FD(i) (pfd[i].fd)
3053544Sjbeck #define WAITFN "POLL"
3063544Sjbeck
3073544Sjbeck static void *
mi_pool_controller(arg)3083544Sjbeck mi_pool_controller(arg)
3093544Sjbeck void *arg;
3103544Sjbeck {
3113544Sjbeck struct pollfd *pfd = NULL;
3123544Sjbeck int dim_pfd = 0;
3133544Sjbeck bool rebuild_set = true;
3143544Sjbeck int pcnt = 0; /* error count for poll() failures */
315*11440SJohn.Beck@Sun.COM time_t lastcheck;
3163544Sjbeck
3173544Sjbeck Tskmgr.tm_tid = sthread_get_id();
3183544Sjbeck if (pthread_detach(Tskmgr.tm_tid) != 0)
3193544Sjbeck {
3203544Sjbeck smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
3213544Sjbeck return NULL;
3223544Sjbeck }
3233544Sjbeck
3243544Sjbeck pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
3253544Sjbeck if (pfd == NULL)
3263544Sjbeck {
3273544Sjbeck smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
3283544Sjbeck sm_errstring(errno));
3293544Sjbeck return NULL;
3303544Sjbeck }
3313544Sjbeck dim_pfd = PFD_STEP;
3323544Sjbeck
333*11440SJohn.Beck@Sun.COM lastcheck = time(NULL);
3343544Sjbeck for (;;)
3353544Sjbeck {
3363544Sjbeck SMFICTX_PTR ctx;
3373544Sjbeck int nfd, rfd, i;
3383544Sjbeck time_t now;
3393544Sjbeck
3403544Sjbeck POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
3413544Sjbeck
3423544Sjbeck if (mi_stop() != MILTER_CONT)
3433544Sjbeck break;
3443544Sjbeck
3453544Sjbeck TASKMGR_LOCK();
3463544Sjbeck
3473544Sjbeck now = time(NULL);
3483544Sjbeck
3493544Sjbeck /* check for timed out sessions? */
3503544Sjbeck if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
3513544Sjbeck {
352*11440SJohn.Beck@Sun.COM ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
353*11440SJohn.Beck@Sun.COM while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
3543544Sjbeck {
355*11440SJohn.Beck@Sun.COM SMFICTX_PTR ctx_nxt;
356*11440SJohn.Beck@Sun.COM
357*11440SJohn.Beck@Sun.COM ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
3583544Sjbeck if (ctx->ctx_wstate == WKST_WAITING)
3593544Sjbeck {
3603544Sjbeck if (ctx->ctx_wait == 0)
361*11440SJohn.Beck@Sun.COM ctx->ctx_wait = now;
362*11440SJohn.Beck@Sun.COM else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
363*11440SJohn.Beck@Sun.COM < now)
3643544Sjbeck {
365*11440SJohn.Beck@Sun.COM /* if session timed out, close it */
3663544Sjbeck sfsistat (*fi_close) __P((SMFICTX *));
3673544Sjbeck
3683544Sjbeck POOL_LEV_DPRINTF(4,
3693544Sjbeck ("Closing old connection: sd=%d id=%d",
3703544Sjbeck ctx->ctx_sd,
3713544Sjbeck ctx->ctx_sid));
3723544Sjbeck
3733544Sjbeck if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
3743544Sjbeck (void) (*fi_close)(ctx);
3753544Sjbeck
3763544Sjbeck mi_close_session(ctx);
3773544Sjbeck }
3783544Sjbeck }
379*11440SJohn.Beck@Sun.COM ctx = ctx_nxt;
3803544Sjbeck }
3813544Sjbeck lastcheck = now;
3823544Sjbeck }
3833544Sjbeck
3843544Sjbeck if (rebuild_set)
3853544Sjbeck {
3863544Sjbeck /*
3873544Sjbeck ** Initialize poll set.
3883544Sjbeck ** Insert into the poll set the file descriptors of
3893544Sjbeck ** all sessions waiting for a command from sendmail.
3903544Sjbeck */
3913544Sjbeck
3923544Sjbeck nfd = 0;
3933544Sjbeck
3943544Sjbeck /* begin with worker pipe */
3953544Sjbeck pfd[nfd].fd = RD_PIPE;
3963544Sjbeck pfd[nfd].events = MI_POLL_RD_FLAGS;
3973544Sjbeck pfd[nfd].revents = 0;
3983544Sjbeck nfd++;
3993544Sjbeck
4003544Sjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
4013544Sjbeck {
4023544Sjbeck /*
4033544Sjbeck ** update ctx_wait - start of wait moment -
4043544Sjbeck ** for timeout
4053544Sjbeck */
4063544Sjbeck
4073544Sjbeck if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
4083544Sjbeck ctx->ctx_wait = now;
4093544Sjbeck
4103544Sjbeck /* add the session to the pollfd array? */
4113544Sjbeck if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
4123544Sjbeck (ctx->ctx_wstate == WKST_WAITING))
4133544Sjbeck {
4143544Sjbeck /*
4153544Sjbeck ** Resize the pollfd array if it
4163544Sjbeck ** isn't large enough.
4173544Sjbeck */
4183544Sjbeck
4193544Sjbeck if (nfd >= dim_pfd)
4203544Sjbeck {
4213544Sjbeck struct pollfd *tpfd;
4223544Sjbeck size_t new;
4233544Sjbeck
4243544Sjbeck new = (dim_pfd + PFD_STEP) *
4253544Sjbeck sizeof(*tpfd);
4263544Sjbeck tpfd = (struct pollfd *)
4273544Sjbeck realloc(pfd, new);
4283544Sjbeck if (tpfd != NULL)
4293544Sjbeck {
4303544Sjbeck pfd = tpfd;
4313544Sjbeck dim_pfd += PFD_STEP;
4323544Sjbeck }
4333544Sjbeck else
4343544Sjbeck {
4353544Sjbeck smi_log(SMI_LOG_ERR,
4363544Sjbeck "Failed to realloc pollfd array:%s",
4373544Sjbeck sm_errstring(errno));
4383544Sjbeck }
4393544Sjbeck }
4403544Sjbeck
4413544Sjbeck /* add the session to pollfd array */
4423544Sjbeck if (nfd < dim_pfd)
4433544Sjbeck {
4443544Sjbeck ctx->ctx_wstate = WKST_WAITING;
4453544Sjbeck pfd[nfd].fd = ctx->ctx_sd;
4463544Sjbeck pfd[nfd].events = MI_POLL_RD_FLAGS;
4473544Sjbeck pfd[nfd].revents = 0;
4483544Sjbeck nfd++;
4493544Sjbeck }
4503544Sjbeck }
4513544Sjbeck }
452*11440SJohn.Beck@Sun.COM rebuild_set = false;
4533544Sjbeck }
4543544Sjbeck
4553544Sjbeck TASKMGR_UNLOCK();
4563544Sjbeck
4573544Sjbeck /* Everything is ready, let's wait for an event */
4583544Sjbeck rfd = poll(pfd, nfd, POLL_TIMEOUT);
4593544Sjbeck
4603544Sjbeck POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
4613544Sjbeck WAITFN, now, nfd));
4623544Sjbeck
4633544Sjbeck /* timeout */
4643544Sjbeck if (rfd == 0)
4653544Sjbeck continue;
4663544Sjbeck
4673544Sjbeck rebuild_set = true;
4683544Sjbeck
4693544Sjbeck /* error */
4703544Sjbeck if (rfd < 0)
4713544Sjbeck {
4723544Sjbeck if (errno == EINTR)
4733544Sjbeck continue;
4743544Sjbeck pcnt++;
4753544Sjbeck smi_log(SMI_LOG_ERR,
4763544Sjbeck "%s() failed (%s), %s",
4773544Sjbeck WAITFN, sm_errstring(errno),
4783544Sjbeck pcnt >= MAX_FAILS_S ? "abort" : "try again");
4793544Sjbeck
4803544Sjbeck if (pcnt >= MAX_FAILS_S)
4813544Sjbeck goto err;
4823544Sjbeck }
4833544Sjbeck pcnt = 0;
4843544Sjbeck
4853544Sjbeck /* something happened */
4863544Sjbeck for (i = 0; i < nfd; i++)
4873544Sjbeck {
4883544Sjbeck if (pfd[i].revents == 0)
4893544Sjbeck continue;
4903544Sjbeck
4913544Sjbeck POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
4923544Sjbeck WAITFN, i, nfd,
4933544Sjbeck WAIT_FD(i)));
4943544Sjbeck
4953544Sjbeck /* has a worker signaled an end of task ? */
4963544Sjbeck if (WAIT_FD(i) == RD_PIPE)
4973544Sjbeck {
4983544Sjbeck char evt = 0;
4993544Sjbeck int r = 0;
5003544Sjbeck
5013544Sjbeck POOL_LEV_DPRINTF(4,
5023544Sjbeck ("PIPE WILL READ evt = %08X %08X",
5033544Sjbeck pfd[i].events, pfd[i].revents));
5043544Sjbeck
5053544Sjbeck if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
5063544Sjbeck {
5073544Sjbeck r = read(RD_PIPE, &evt, sizeof(evt));
5083544Sjbeck if (r == sizeof(evt))
5093544Sjbeck {
5103544Sjbeck /* Do nothing */
5113544Sjbeck }
5123544Sjbeck }
5133544Sjbeck
5143544Sjbeck POOL_LEV_DPRINTF(4,
5153544Sjbeck ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
5163544Sjbeck i, RD_PIPE, r, evt));
5173544Sjbeck
5183544Sjbeck if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
5193544Sjbeck {
5203544Sjbeck /* Exception handling */
5213544Sjbeck }
5223544Sjbeck continue;
5233544Sjbeck }
5243544Sjbeck
5253544Sjbeck /* no ! sendmail wants to send a command */
5263544Sjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
5273544Sjbeck {
5283544Sjbeck if (ctx->ctx_wstate != WKST_WAITING)
5293544Sjbeck continue;
5303544Sjbeck
5313544Sjbeck POOL_LEV_DPRINTF(4,
5323544Sjbeck ("Checking context sd=%d - fd=%d ",
5333544Sjbeck ctx->ctx_sd , WAIT_FD(i)));
5343544Sjbeck
5353544Sjbeck if (ctx->ctx_sd == pfd[i].fd)
5363544Sjbeck {
5373544Sjbeck TASKMGR_LOCK();
5383544Sjbeck
5393544Sjbeck POOL_LEV_DPRINTF(4,
5403544Sjbeck ("TASK: found %d for fd[%d]=%d",
5413544Sjbeck ctx->ctx_sid, i, WAIT_FD(i)));
5423544Sjbeck
5433544Sjbeck if (Tskmgr.tm_nb_idle > 0)
5443544Sjbeck {
5453544Sjbeck ctx->ctx_wstate = WKST_READY_TO_RUN;
5463544Sjbeck TASKMGR_COND_SIGNAL();
5473544Sjbeck }
5483544Sjbeck else
5493544Sjbeck {
5503544Sjbeck ctx->ctx_wstate = WKST_RUNNING;
5513544Sjbeck LAUNCH_WORKER(ctx);
5523544Sjbeck }
5533544Sjbeck TASKMGR_UNLOCK();
5543544Sjbeck break;
5553544Sjbeck }
5563544Sjbeck }
5573544Sjbeck
5583544Sjbeck POOL_LEV_DPRINTF(4,
5593544Sjbeck ("TASK %s FOUND - Checking PIPE for fd[%d]",
5603544Sjbeck ctx != NULL ? "" : "NOT", WAIT_FD(i)));
5613544Sjbeck }
5623544Sjbeck }
5633544Sjbeck
5643544Sjbeck err:
5653544Sjbeck if (pfd != NULL)
5663544Sjbeck free(pfd);
5673544Sjbeck
5683544Sjbeck Tskmgr.tm_signature = 0;
5693544Sjbeck for (;;)
5703544Sjbeck {
5713544Sjbeck SMFICTX_PTR ctx;
5723544Sjbeck
5733544Sjbeck ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
5743544Sjbeck if (ctx == NULL)
5753544Sjbeck break;
5763544Sjbeck mi_close_session(ctx);
5773544Sjbeck }
5783544Sjbeck
5793544Sjbeck (void) smutex_destroy(&Tskmgr.tm_w_mutex);
5803544Sjbeck (void) scond_destroy(&Tskmgr.tm_w_cond);
5813544Sjbeck
5823544Sjbeck return NULL;
5833544Sjbeck }
5843544Sjbeck
5853544Sjbeck /*
5863544Sjbeck ** Look for a task ready to run.
5873544Sjbeck ** Value of ctx is NULL or a pointer to a task ready to run.
5883544Sjbeck */
5893544Sjbeck
5903544Sjbeck #define GET_TASK_READY_TO_RUN() \
5913544Sjbeck SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
5923544Sjbeck { \
5933544Sjbeck if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
5943544Sjbeck { \
5953544Sjbeck ctx->ctx_wstate = WKST_RUNNING; \
5963544Sjbeck break; \
5973544Sjbeck } \
5983544Sjbeck }
5993544Sjbeck
6003544Sjbeck /*
6013544Sjbeck ** MI_WORKER -- worker thread
6023544Sjbeck ** executes tasks distributed by the mi_pool_controller
6033544Sjbeck ** or by mi_start_session
6043544Sjbeck **
6053544Sjbeck ** Parameters:
6063544Sjbeck ** arg -- pointer to context structure
6073544Sjbeck **
6083544Sjbeck ** Returns:
6093544Sjbeck ** NULL pointer
6103544Sjbeck */
6113544Sjbeck
6123544Sjbeck static void *
mi_worker(arg)6133544Sjbeck mi_worker(arg)
6143544Sjbeck void *arg;
6153544Sjbeck {
6163544Sjbeck SMFICTX_PTR ctx;
6173544Sjbeck bool done;
6183544Sjbeck sthread_t t_id;
6193544Sjbeck int r;
6203544Sjbeck
6213544Sjbeck ctx = (SMFICTX_PTR) arg;
6223544Sjbeck done = false;
6233544Sjbeck if (ctx != NULL)
6243544Sjbeck ctx->ctx_wstate = WKST_RUNNING;
6253544Sjbeck
6263544Sjbeck t_id = sthread_get_id();
6273544Sjbeck if (pthread_detach(t_id) != 0)
6283544Sjbeck {
6293544Sjbeck smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
6303544Sjbeck if (ctx != NULL)
6313544Sjbeck ctx->ctx_wstate = WKST_READY_TO_RUN;
6323544Sjbeck return NULL;
6333544Sjbeck }
6343544Sjbeck
6353544Sjbeck TASKMGR_LOCK();
6363544Sjbeck Tskmgr.tm_nb_workers++;
6373544Sjbeck TASKMGR_UNLOCK();
6383544Sjbeck
6393544Sjbeck while (!done)
6403544Sjbeck {
6413544Sjbeck if (mi_stop() != MILTER_CONT)
6423544Sjbeck break;
6433544Sjbeck
6443544Sjbeck /* let's handle next task... */
6453544Sjbeck if (ctx != NULL)
6463544Sjbeck {
6473544Sjbeck int res;
6483544Sjbeck
6493544Sjbeck POOL_LEV_DPRINTF(4,
6503544Sjbeck ("worker %d: new task -> let's handle it",
6513544Sjbeck t_id));
6523544Sjbeck res = mi_engine(ctx);
6533544Sjbeck POOL_LEV_DPRINTF(4,
6543544Sjbeck ("worker %d: mi_engine returned %d", t_id, res));
6553544Sjbeck
6563544Sjbeck TASKMGR_LOCK();
6573544Sjbeck if (res != MI_CONTINUE)
6583544Sjbeck {
6593544Sjbeck ctx->ctx_wstate = WKST_CLOSING;
6603544Sjbeck
6613544Sjbeck /*
6623544Sjbeck ** Delete context from linked list of
6633544Sjbeck ** sessions and close session.
6643544Sjbeck */
6653544Sjbeck
6663544Sjbeck mi_close_session(ctx);
6673544Sjbeck }
6683544Sjbeck else
6693544Sjbeck {
6703544Sjbeck ctx->ctx_wstate = WKST_READY_TO_WAIT;
6713544Sjbeck
6723544Sjbeck POOL_LEV_DPRINTF(4,
6733544Sjbeck ("writing to event pipe..."));
6743544Sjbeck
6753544Sjbeck /*
6763544Sjbeck ** Signal task controller to add new session
6773544Sjbeck ** to poll set.
6783544Sjbeck */
6793544Sjbeck
6803544Sjbeck PIPE_SEND_SIGNAL();
6813544Sjbeck }
6823544Sjbeck TASKMGR_UNLOCK();
6833544Sjbeck ctx = NULL;
6843544Sjbeck
6853544Sjbeck }
6863544Sjbeck
6873544Sjbeck /* check if there is any task waiting to be served */
6883544Sjbeck TASKMGR_LOCK();
6893544Sjbeck
6903544Sjbeck GET_TASK_READY_TO_RUN();
6913544Sjbeck
6923544Sjbeck /* Got a task? */
6933544Sjbeck if (ctx != NULL)
6943544Sjbeck {
6953544Sjbeck TASKMGR_UNLOCK();
6963544Sjbeck continue;
6973544Sjbeck }
6983544Sjbeck
6993544Sjbeck /*
7003544Sjbeck ** if not, let's check if there is enough idle workers
7013544Sjbeck ** if yes: quit
7023544Sjbeck */
7033544Sjbeck
7043544Sjbeck if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
7053544Sjbeck Tskmgr.tm_nb_idle > MIN_IDLE)
7063544Sjbeck done = true;
7073544Sjbeck
7083544Sjbeck POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
7093544Sjbeck Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
7103544Sjbeck
7113544Sjbeck if (done)
7123544Sjbeck {
7133544Sjbeck POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
7143544Sjbeck Tskmgr.tm_nb_workers--;
7153544Sjbeck TASKMGR_UNLOCK();
7163544Sjbeck continue;
7173544Sjbeck }
7183544Sjbeck
7193544Sjbeck /*
7203544Sjbeck ** if no task ready to run, wait for another one
7213544Sjbeck */
7223544Sjbeck
7233544Sjbeck Tskmgr.tm_nb_idle++;
7243544Sjbeck TASKMGR_COND_WAIT();
7253544Sjbeck Tskmgr.tm_nb_idle--;
7263544Sjbeck
7273544Sjbeck /* look for a task */
7283544Sjbeck GET_TASK_READY_TO_RUN();
7293544Sjbeck
7303544Sjbeck TASKMGR_UNLOCK();
7313544Sjbeck }
7323544Sjbeck return NULL;
7333544Sjbeck }
7343544Sjbeck
7353544Sjbeck /*
7363544Sjbeck ** MI_LIST_ADD_CTX -- add new session to linked list
7373544Sjbeck **
7383544Sjbeck ** Parameters:
7393544Sjbeck ** ctx -- context structure
7403544Sjbeck **
7413544Sjbeck ** Returns:
7423544Sjbeck ** MI_FAILURE/MI_SUCCESS
7433544Sjbeck */
7443544Sjbeck
7453544Sjbeck static int
mi_list_add_ctx(ctx)7463544Sjbeck mi_list_add_ctx(ctx)
7473544Sjbeck SMFICTX_PTR ctx;
7483544Sjbeck {
7493544Sjbeck SM_ASSERT(ctx != NULL);
7503544Sjbeck SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
7513544Sjbeck return MI_SUCCESS;
7523544Sjbeck }
7533544Sjbeck
7543544Sjbeck /*
7553544Sjbeck ** MI_LIST_DEL_CTX -- remove session from linked list when finished
7563544Sjbeck **
7573544Sjbeck ** Parameters:
7583544Sjbeck ** ctx -- context structure
7593544Sjbeck **
7603544Sjbeck ** Returns:
7613544Sjbeck ** MI_FAILURE/MI_SUCCESS
7623544Sjbeck */
7633544Sjbeck
7643544Sjbeck static int
mi_list_del_ctx(ctx)7653544Sjbeck mi_list_del_ctx(ctx)
7663544Sjbeck SMFICTX_PTR ctx;
7673544Sjbeck {
7683544Sjbeck SM_ASSERT(ctx != NULL);
7693544Sjbeck if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
7703544Sjbeck return MI_FAILURE;
7713544Sjbeck
7723544Sjbeck SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
7733544Sjbeck return MI_SUCCESS;
7743544Sjbeck }
7753544Sjbeck #endif /* _FFR_WORKERS_POOL */
776