root/usr/src/cmd/sendmail/libmilter/worker.c
/*
 *  Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
 *      All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 *
 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
 *   Jose-Marcio.Martins@ensmp.fr
 */

#include <sm/gen.h>
SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")

#include "libmilter.h"

#if _FFR_WORKERS_POOL

typedef struct taskmgr_S taskmgr_T;

#define TM_SIGNATURE            0x23021957

struct taskmgr_S
{
        long            tm_signature; /* has the controller been initialized */
        sthread_t       tm_tid; /* thread id of controller */
        smfi_hd_T       tm_ctx_head; /* head of the linked list of contexts */

        int             tm_nb_workers;  /* number of workers in the pool */
        int             tm_nb_idle;     /* number of workers waiting */

        int             tm_p[2];        /* poll control pipe */

        smutex_t        tm_w_mutex;     /* linked list access mutex */
        scond_t         tm_w_cond;      /* */
};

static taskmgr_T     Tskmgr = {0};

#define WRK_CTX_HEAD    Tskmgr.tm_ctx_head

#define RD_PIPE (Tskmgr.tm_p[0])
#define WR_PIPE (Tskmgr.tm_p[1])

#define PIPE_SEND_SIGNAL()                                              \
        do                                                              \
        {                                                               \
                char evt = 0x5a;                                        \
                int fd = WR_PIPE;                                       \
                if (write(fd, &evt, sizeof(evt)) != sizeof(evt))        \
                        smi_log(SMI_LOG_ERR,                            \
                                "Error writing to event pipe: %s",      \
                                sm_errstring(errno));                   \
        } while (0)

#ifndef USE_PIPE_WAKE_POLL
# define USE_PIPE_WAKE_POLL 1
#endif /* USE_PIPE_WAKE_POLL */

/* poll check periodicity (default 10000 - 10 s) */
#define POLL_TIMEOUT   10000

/* worker conditional wait timeout (default 10 s) */
#define COND_TIMEOUT     10

/* functions */
static int mi_close_session __P((SMFICTX_PTR));

static void *mi_worker __P((void *));
static void *mi_pool_controller __P((void *));

static int mi_list_add_ctx __P((SMFICTX_PTR));
static int mi_list_del_ctx __P((SMFICTX_PTR));

/*
**  periodicity of cleaning up old sessions (timedout)
**      sessions list will be checked to find old inactive
**      sessions each DT_CHECK_OLD_SESSIONS sec
*/

#define DT_CHECK_OLD_SESSIONS   600

#ifndef OLD_SESSION_TIMEOUT
# define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
#endif /* OLD_SESSION_TIMEOUT */

/* session states - with respect to the pool of workers */
#define WKST_INIT               0       /* initial state */
#define WKST_READY_TO_RUN       1       /* command ready do be read */
#define WKST_RUNNING            2       /* session running on a worker */
#define WKST_READY_TO_WAIT      3       /* session just finished by a worker */
#define WKST_WAITING            4       /* waiting for new command */
#define WKST_CLOSING            5       /* session finished */

#ifndef MIN_WORKERS
# define MIN_WORKERS    2  /* minimum number of threads to keep around */
#endif

#define MIN_IDLE        1  /* minimum number of idle threads */


/*
**  Macros for threads and mutex management
*/

#define TASKMGR_LOCK()                                                  \
        do                                                              \
        {                                                               \
                if (!smutex_lock(&Tskmgr.tm_w_mutex))                   \
                        smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");     \
        } while (0)

#define TASKMGR_UNLOCK()                                                \
        do                                                              \
        {                                                               \
                if (!smutex_unlock(&Tskmgr.tm_w_mutex))                 \
                        smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");   \
        } while (0)

#define TASKMGR_COND_WAIT()                                             \
        scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)

#define TASKMGR_COND_SIGNAL()                                           \
        do                                                              \
        {                                                               \
                if (scond_signal(&Tskmgr.tm_w_cond) != 0)               \
                        smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
        } while (0)

#define LAUNCH_WORKER(ctx)                                              \
        do                                                              \
        {                                                               \
                int r;                                                  \
                sthread_t tid;                                          \
                                                                        \
                if ((r = thread_create(&tid, mi_worker, ctx)) != 0)     \
                        smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
                                sm_errstring(r));                       \
        } while (0)

#if POOL_DEBUG
# define POOL_LEV_DPRINTF(lev, x)                                       \
        do {                                                            \
                if ((lev) < ctx->ctx_dbg)                               \
                        sm_dprintf x;                                   \
        } while (0)
#else /* POOL_DEBUG */
# define POOL_LEV_DPRINTF(lev, x)
#endif /* POOL_DEBUG */

/*
**  MI_START_SESSION -- Start a session in the pool of workers
**
**      Parameters:
**              ctx -- context structure
**
**      Returns:
**              MI_SUCCESS/MI_FAILURE
*/

int
mi_start_session(ctx)
        SMFICTX_PTR ctx;
{
        static long id = 0;

        SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
        SM_ASSERT(ctx != NULL);
        POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
        TASKMGR_LOCK();

        if (mi_list_add_ctx(ctx) != MI_SUCCESS)
        {
                TASKMGR_UNLOCK();
                return MI_FAILURE;
        }

        ctx->ctx_sid = id++;

        /* if there is an idle worker, signal it, otherwise start new worker */
        if (Tskmgr.tm_nb_idle > 0)
        {
                ctx->ctx_wstate = WKST_READY_TO_RUN;
                TASKMGR_COND_SIGNAL();
        }
        else
        {
                ctx->ctx_wstate = WKST_RUNNING;
                LAUNCH_WORKER(ctx);
        }
        TASKMGR_UNLOCK();
        return MI_SUCCESS;
}

/*
**  MI_CLOSE_SESSION -- Close a session and clean up data structures
**
**      Parameters:
**              ctx -- context structure
**
**      Returns:
**              MI_SUCCESS/MI_FAILURE
*/

static int
mi_close_session(ctx)
        SMFICTX_PTR ctx;
{
        SM_ASSERT(ctx != NULL);

        (void) mi_list_del_ctx(ctx);
        mi_clr_ctx(ctx);

        return MI_SUCCESS;
}

/*
**  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
**              Must be called before starting sessions.
**
**      Parameters:
**              none
**
**      Returns:
**              MI_SUCCESS/MI_FAILURE
*/

int
mi_pool_controller_init()
{
        sthread_t tid;
        int r, i;

        if (Tskmgr.tm_signature == TM_SIGNATURE)
                return MI_SUCCESS;

        SM_TAILQ_INIT(&WRK_CTX_HEAD);
        Tskmgr.tm_tid = (sthread_t) -1;
        Tskmgr.tm_nb_workers = 0;
        Tskmgr.tm_nb_idle = 0;

        if (pipe(Tskmgr.tm_p) != 0)
        {
                smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
                        sm_errstring(errno));
                return MI_FAILURE;
        }

        (void) smutex_init(&Tskmgr.tm_w_mutex);
        (void) scond_init(&Tskmgr.tm_w_cond);

        /* Launch the pool controller */
        if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
        {
                smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
                        sm_errstring(r));
                return MI_FAILURE;
        }
        Tskmgr.tm_tid = tid;
        Tskmgr.tm_signature = TM_SIGNATURE;

        /* Create the pool of workers */
        for (i = 0; i < MIN_WORKERS; i++)
        {
                if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
                {
                        smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
                                sm_errstring(r));
                        return MI_FAILURE;
                }
        }

        return MI_SUCCESS;
}

/*
**  MI_POOL_CONTROLLER -- manage the pool of workers
**      This thread must be running when listener begins
**      starting sessions
**
**      Parameters:
**              arg -- unused
**
**      Returns:
**              NULL
**
**      Control flow:
**              for (;;)
**                      Look for timed out sessions
**                      Select sessions to wait for sendmail command
**                      Poll set of file descriptors
**                      if timeout
**                              continue
**                      For each file descriptor ready
**                              launch new thread if no worker available
**                              else
**                              signal waiting worker
*/

/* Poll structure array (pollfd) size step */
#define PFD_STEP        256

#define WAIT_FD(i)      (pfd[i].fd)
#define WAITFN          "POLL"

static void *
mi_pool_controller(arg)
        void *arg;
{
        struct pollfd *pfd = NULL;
        int dim_pfd = 0;
        bool rebuild_set = true;
        int pcnt = 0; /* error count for poll() failures */
        time_t lastcheck;

        Tskmgr.tm_tid = sthread_get_id();
        if (pthread_detach(Tskmgr.tm_tid) != 0)
        {
                smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
                return NULL;
        }

        pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
        if (pfd == NULL)
        {
                smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
                        sm_errstring(errno));
                return NULL;
        }
        dim_pfd = PFD_STEP;

        lastcheck = time(NULL);
        for (;;)
        {
                SMFICTX_PTR ctx;
                int nfd, rfd, i;
                time_t now;

                POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));

                if (mi_stop() != MILTER_CONT)
                        break;

                TASKMGR_LOCK();

                now = time(NULL);

                /* check for timed out sessions? */
                if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
                {
                        ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
                        while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
                        {
                                SMFICTX_PTR ctx_nxt;

                                ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
                                if (ctx->ctx_wstate == WKST_WAITING)
                                {
                                        if (ctx->ctx_wait == 0)
                                                ctx->ctx_wait = now;
                                        else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
                                                 < now)
                                        {
                                                /* if session timed out, close it */
                                                sfsistat (*fi_close) __P((SMFICTX *));

                                                POOL_LEV_DPRINTF(4,
                                                        ("Closing old connection: sd=%d id=%d",
                                                        ctx->ctx_sd,
                                                        ctx->ctx_sid));

                                                if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
                                                        (void) (*fi_close)(ctx);

                                                mi_close_session(ctx);
                                        }
                                }
                                ctx = ctx_nxt;
                        }
                        lastcheck = now;
                }

                if (rebuild_set)
                {
                        /*
                        **  Initialize poll set.
                        **  Insert into the poll set the file descriptors of
                        **  all sessions waiting for a command from sendmail.
                        */

                        nfd = 0;

                        /* begin with worker pipe */
                        pfd[nfd].fd = RD_PIPE;
                        pfd[nfd].events = MI_POLL_RD_FLAGS;
                        pfd[nfd].revents = 0;
                        nfd++;

                        SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
                        {
                                /*
                                **  update ctx_wait - start of wait moment -
                                **  for timeout
                                */

                                if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
                                        ctx->ctx_wait = now;

                                /* add the session to the pollfd array? */
                                if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
                                    (ctx->ctx_wstate == WKST_WAITING))
                                {
                                        /*
                                        **  Resize the pollfd array if it
                                        **  isn't large enough.
                                        */

                                        if (nfd >= dim_pfd)
                                        {
                                                struct pollfd *tpfd;
                                                size_t new;

                                                new = (dim_pfd + PFD_STEP) *
                                                        sizeof(*tpfd);
                                                tpfd = (struct pollfd *)
                                                        realloc(pfd, new);
                                                if (tpfd != NULL)
                                                {
                                                        pfd = tpfd;
                                                        dim_pfd += PFD_STEP;
                                                }
                                                else
                                                {
                                                        smi_log(SMI_LOG_ERR,
                                                                "Failed to realloc pollfd array:%s",
                                                                sm_errstring(errno));
                                                }
                                        }

                                        /* add the session to pollfd array */
                                        if (nfd < dim_pfd)
                                        {
                                                ctx->ctx_wstate = WKST_WAITING;
                                                pfd[nfd].fd = ctx->ctx_sd;
                                                pfd[nfd].events = MI_POLL_RD_FLAGS;
                                                pfd[nfd].revents = 0;
                                                nfd++;
                                        }
                                }
                        }
                        rebuild_set = false;
                }

                TASKMGR_UNLOCK();

                /* Everything is ready, let's wait for an event */
                rfd = poll(pfd, nfd, POLL_TIMEOUT);

                POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
                        WAITFN, now, nfd));

                /* timeout */
                if (rfd == 0)
                        continue;

                rebuild_set = true;

                /* error */
                if (rfd < 0)
                {
                        if (errno == EINTR)
                                continue;
                        pcnt++;
                        smi_log(SMI_LOG_ERR,
                                "%s() failed (%s), %s",
                                WAITFN, sm_errstring(errno),
                                pcnt >= MAX_FAILS_S ? "abort" : "try again");

                        if (pcnt >= MAX_FAILS_S)
                                goto err;
                }
                pcnt = 0;

                /* something happened */
                for (i = 0; i < nfd; i++)
                {
                        if (pfd[i].revents == 0)
                                continue;

                        POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
                                WAITFN, i, nfd,
                        WAIT_FD(i)));

                        /* has a worker signaled an end of task ? */
                        if (WAIT_FD(i) == RD_PIPE)
                        {
                                char evt = 0;
                                int r = 0;

                                POOL_LEV_DPRINTF(4,
                                        ("PIPE WILL READ evt = %08X %08X",
                                        pfd[i].events, pfd[i].revents));

                                if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
                                {
                                        r = read(RD_PIPE, &evt, sizeof(evt));
                                        if (r == sizeof(evt))
                                        {
                                                /* Do nothing */
                                        }
                                }

                                POOL_LEV_DPRINTF(4,
                                        ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
                                        i, RD_PIPE, r, evt));

                                if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
                                {
                                        /* Exception handling */
                                }
                                continue;
                        }

                        /* no ! sendmail wants to send a command */
                        SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
                        {
                                if (ctx->ctx_wstate != WKST_WAITING)
                                        continue;

                                POOL_LEV_DPRINTF(4,
                                        ("Checking context sd=%d - fd=%d ",
                                        ctx->ctx_sd , WAIT_FD(i)));

                                if (ctx->ctx_sd == pfd[i].fd)
                                {
                                        TASKMGR_LOCK();

                                        POOL_LEV_DPRINTF(4,
                                                ("TASK: found %d for fd[%d]=%d",
                                                ctx->ctx_sid, i, WAIT_FD(i)));

                                        if (Tskmgr.tm_nb_idle > 0)
                                        {
                                                ctx->ctx_wstate = WKST_READY_TO_RUN;
                                                TASKMGR_COND_SIGNAL();
                                        }
                                        else
                                        {
                                                ctx->ctx_wstate = WKST_RUNNING;
                                                LAUNCH_WORKER(ctx);
                                        }
                                        TASKMGR_UNLOCK();
                                        break;
                                }
                        }

                        POOL_LEV_DPRINTF(4,
                                ("TASK %s FOUND - Checking PIPE for fd[%d]",
                                ctx != NULL ? "" : "NOT", WAIT_FD(i)));
                }
        }

  err:
        if (pfd != NULL)
                free(pfd);

        Tskmgr.tm_signature = 0;
        for (;;)
        {
                SMFICTX_PTR ctx;

                ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
                if (ctx == NULL)
                        break;
                mi_close_session(ctx);
        }

        (void) smutex_destroy(&Tskmgr.tm_w_mutex);
        (void) scond_destroy(&Tskmgr.tm_w_cond);

        return NULL;
}

/*
**  Look for a task ready to run.
**  Value of ctx is NULL or a pointer to a task ready to run.
*/

#define GET_TASK_READY_TO_RUN()                                 \
        SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)          \
        {                                                       \
                if (ctx->ctx_wstate == WKST_READY_TO_RUN)       \
                {                                               \
                        ctx->ctx_wstate = WKST_RUNNING;         \
                        break;                                  \
                }                                               \
        }

/*
**  MI_WORKER -- worker thread
**      executes tasks distributed by the mi_pool_controller
**      or by mi_start_session
**
**      Parameters:
**              arg -- pointer to context structure
**
**      Returns:
**              NULL pointer
*/

static void *
mi_worker(arg)
        void *arg;
{
        SMFICTX_PTR ctx;
        bool done;
        sthread_t t_id;
        int r;

        ctx = (SMFICTX_PTR) arg;
        done = false;
        if (ctx != NULL)
                ctx->ctx_wstate = WKST_RUNNING;

        t_id = sthread_get_id();
        if (pthread_detach(t_id) != 0)
        {
                smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
                if (ctx != NULL)
                        ctx->ctx_wstate = WKST_READY_TO_RUN;
                return NULL;
        }

        TASKMGR_LOCK();
        Tskmgr.tm_nb_workers++;
        TASKMGR_UNLOCK();

        while (!done)
        {
                if (mi_stop() != MILTER_CONT)
                        break;

                /* let's handle next task... */
                if (ctx != NULL)
                {
                        int res;

                        POOL_LEV_DPRINTF(4,
                                ("worker %d: new task -> let's handle it",
                                t_id));
                        res = mi_engine(ctx);
                        POOL_LEV_DPRINTF(4,
                                ("worker %d: mi_engine returned %d", t_id, res));

                        TASKMGR_LOCK();
                        if (res != MI_CONTINUE)
                        {
                                ctx->ctx_wstate = WKST_CLOSING;

                                /*
                                **  Delete context from linked list of
                                **  sessions and close session.
                                */

                                mi_close_session(ctx);
                        }
                        else
                        {
                                ctx->ctx_wstate = WKST_READY_TO_WAIT;

                                POOL_LEV_DPRINTF(4,
                                        ("writing to event pipe..."));

                                /*
                                **  Signal task controller to add new session
                                **  to poll set.
                                */

                                PIPE_SEND_SIGNAL();
                        }
                        TASKMGR_UNLOCK();
                        ctx = NULL;

                }

                /* check if there is any task waiting to be served */
                TASKMGR_LOCK();

                GET_TASK_READY_TO_RUN();

                /* Got a task? */
                if (ctx != NULL)
                {
                        TASKMGR_UNLOCK();
                        continue;
                }

                /*
                **  if not, let's check if there is enough idle workers
                **      if yes: quit
                */

                if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
                    Tskmgr.tm_nb_idle > MIN_IDLE)
                        done = true;

                POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
                        Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));

                if (done)
                {
                        POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
                        Tskmgr.tm_nb_workers--;
                        TASKMGR_UNLOCK();
                        continue;
                }

                /*
                **  if no task ready to run, wait for another one
                */

                Tskmgr.tm_nb_idle++;
                TASKMGR_COND_WAIT();
                Tskmgr.tm_nb_idle--;

                /* look for a task */
                GET_TASK_READY_TO_RUN();

                TASKMGR_UNLOCK();
        }
        return NULL;
}

/*
**  MI_LIST_ADD_CTX -- add new session to linked list
**
**      Parameters:
**              ctx -- context structure
**
**      Returns:
**              MI_FAILURE/MI_SUCCESS
*/

static int
mi_list_add_ctx(ctx)
        SMFICTX_PTR ctx;
{
        SM_ASSERT(ctx != NULL);
        SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
        return MI_SUCCESS;
}

/*
**  MI_LIST_DEL_CTX -- remove session from linked list when finished
**
**      Parameters:
**              ctx -- context structure
**
**      Returns:
**              MI_FAILURE/MI_SUCCESS
*/

static int
mi_list_del_ctx(ctx)
        SMFICTX_PTR ctx;
{
        SM_ASSERT(ctx != NULL);
        if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
                return MI_FAILURE;

        SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
        return MI_SUCCESS;
}
#endif /* _FFR_WORKERS_POOL */