root/usr/src/lib/lib9p/common/threadpool.c
/*
 * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
 * All rights reserved
 *
 * Copyright 2020 Joyent, Inc.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted providing that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 */

#include <errno.h>
#include <stdlib.h>
#include <pthread.h>
#if defined(__FreeBSD__)
#include <pthread_np.h>
#endif
#include <sys/queue.h>
#include "lib9p.h"
#include "threadpool.h"

static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
    struct l9p_request *req);

static void *
l9p_responder(void *arg)
{
        struct l9p_threadpool *tp;
        struct l9p_worker *worker = arg;
        struct l9p_request *req;

        tp = worker->ltw_tp;
        for (;;) {
                /* get next reply to send */

                if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
                        break;
                while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) {
                        (void) pthread_cond_wait(&tp->ltp_reply_cv,
                            &tp->ltp_mtx);
                }
                if (worker->ltw_exiting) {
                        (void) pthread_mutex_unlock(&tp->ltp_mtx);
                        break;
                }

                /* off reply queue */
                req = STAILQ_FIRST(&tp->ltp_replyq);
                STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);

                /* request is now in final glide path, can't be Tflush-ed */
                req->lr_workstate = L9P_WS_REPLYING;

                /* any flushers waiting for this request can go now */
                if (req->lr_flushstate != L9P_FLUSH_NONE)
                        l9p_threadpool_rflush(tp, req);

                if (pthread_mutex_unlock(&tp->ltp_mtx) != 0)
                        break;

                /* send response */
                l9p_respond(req, false, true);
        }
        return (NULL);
}

static void *
l9p_worker(void *arg)
{
        struct l9p_threadpool *tp;
        struct l9p_worker *worker = arg;
        struct l9p_request *req;

        tp = worker->ltw_tp;
        if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
                return (NULL);
        for (;;) {
                while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) {
                        (void) pthread_cond_wait(&tp->ltp_work_cv,
                            &tp->ltp_mtx);
                }
                if (worker->ltw_exiting)
                        break;

                /* off work queue; now work-in-progress, by us */
                req = STAILQ_FIRST(&tp->ltp_workq);
                STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
                req->lr_workstate = L9P_WS_INPROGRESS;
                req->lr_worker = worker;
                (void) pthread_mutex_unlock(&tp->ltp_mtx);

                /* actually try the request */
                req->lr_error = l9p_dispatch_request(req);

                /* move to responder queue, updating work-state */
                if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
                        return (NULL);
                req->lr_workstate = L9P_WS_RESPQUEUED;
                req->lr_worker = NULL;
                STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);

                /* signal the responder */
                (void) pthread_cond_signal(&tp->ltp_reply_cv);
        }
        (void) pthread_mutex_unlock(&tp->ltp_mtx);
        return (NULL);
}

/*
 * Just before finally replying to a request that got touched by
 * a Tflush request, we enqueue its flushers (requests of type
 * Tflush, which are now on the flushee's lr_flushq) onto the
 * response queue.
 */
static void
l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
{
        struct l9p_request *flusher;

        /*
         * https://swtch.com/plan9port/man/man9/flush.html says:
         *
         * "Should multiple Tflushes be received for a pending
         * request, they must be answered in order.  A Rflush for
         * any of the multiple Tflushes implies an answer for all
         * previous ones.  Therefore, should a server receive a
         * request and then multiple flushes for that request, it
         * need respond only to the last flush."  This means
         * we could march through the queue of flushers here,
         * marking all but the last one as "to be dropped" rather
         * than "to be replied-to".
         *
         * However, we'll leave that for later, if ever -- it
         * should be harmless to respond to each, in order.
         */
        STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
                flusher->lr_workstate = L9P_WS_RESPQUEUED;
#ifdef notdef
                if (not the last) {
                        flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
                        /* or, flusher->lr_drop = true ? */
                }
#endif
                STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
        }
}

int
l9p_threadpool_init(struct l9p_threadpool *tp, int size)
{
        struct l9p_worker *worker;
#if defined(__FreeBSD__)
        char threadname[16];
#endif
        int error;
        int i, nworkers, nresponders;

        if (size <= 0)
                return (EINVAL);
#ifdef __illumos__
        pthread_mutexattr_t attr;

        if ((error = pthread_mutexattr_init(&attr)) != 0)
                return (error);
        if ((error = pthread_mutexattr_settype(&attr,
            PTHREAD_MUTEX_ERRORCHECK)) != 0) {
                return (error);
        }
        error = pthread_mutex_init(&tp->ltp_mtx, &attr);
#else
        error = pthread_mutex_init(&tp->ltp_mtx, NULL);
#endif
        if (error)
                return (error);
        error = pthread_cond_init(&tp->ltp_work_cv, NULL);
        if (error)
                goto fail_work_cv;
        error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
        if (error)
                goto fail_reply_cv;

        STAILQ_INIT(&tp->ltp_workq);
        STAILQ_INIT(&tp->ltp_replyq);
        LIST_INIT(&tp->ltp_workers);

        nresponders = 0;
        nworkers = 0;
        for (i = 0; i <= size; i++) {
                worker = calloc(1, sizeof(struct l9p_worker));
#ifdef __illumos__
                if (worker == NULL)
                        break;
#endif
                worker->ltw_tp = tp;
                worker->ltw_responder = i == 0;
                error = pthread_create(&worker->ltw_thread, NULL,
                    worker->ltw_responder ? l9p_responder : l9p_worker,
                    (void *)worker);
                if (error) {
                        free(worker);
                        break;
                }
                if (worker->ltw_responder)
                        nresponders++;
                else
                        nworkers++;

#if defined(__FreeBSD__)
                if (worker->ltw_responder) {
                        pthread_set_name_np(worker->ltw_thread, "9p-responder");
                } else {
                        sprintf(threadname, "9p-worker:%d", i - 1);
                        pthread_set_name_np(worker->ltw_thread, threadname);
                }
#elif defined(__illumos__)
                if (worker->ltw_responder) {
                        (void) pthread_setname_np(worker->ltw_thread,
                            "9p-responder");
                } else {
                        char threadname[PTHREAD_MAX_NAMELEN_NP];

                        (void) snprintf(threadname, sizeof (threadname),
                            "9p-worker:%d", i - 1);
                        (void) pthread_setname_np(worker->ltw_thread,
                            threadname);
                }
#endif

                LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
        }
        if (nresponders == 0 || nworkers == 0) {
                /* need the one responder, and at least one worker */
                l9p_threadpool_shutdown(tp);
                return (error);
        }
        return (0);

        /*
         * We could avoid these labels by having multiple destroy
         * paths (one for each error case), or by having booleans
         * for which variables were initialized.  Neither is very
         * appealing...
         */
fail_reply_cv:
        (void) pthread_cond_destroy(&tp->ltp_work_cv);
fail_work_cv:
        (void) pthread_mutex_destroy(&tp->ltp_mtx);

        return (error);
}

/*
 * Run a request, usually by queueing it.
 */
void
l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
{

        /*
         * Flush requests must be handled specially, since they
         * can cancel / kill off regular requests.  (But we can
         * run them through the regular dispatch mechanism.)
         */
        if (req->lr_req.hdr.type == L9P_TFLUSH) {
                /* not on a work queue yet so we can touch state */
                req->lr_workstate = L9P_WS_IMMEDIATE;
                (void) l9p_dispatch_request(req);
        } else {
                if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
                        return;
                req->lr_workstate = L9P_WS_NOTSTARTED;
                STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
                (void) pthread_cond_signal(&tp->ltp_work_cv);
                (void) pthread_mutex_unlock(&tp->ltp_mtx);
        }
}

/*
 * Run a Tflush request.  Called via l9p_dispatch_request() since
 * it has some debug code in it, but not called from worker thread.
 */
int
l9p_threadpool_tflush(struct l9p_request *req)
{
        struct l9p_connection *conn;
        struct l9p_threadpool *tp;
        struct l9p_request *flushee;
        uint16_t oldtag;
        enum l9p_flushstate nstate = L9P_FLUSH_NONE;
        int err;

        /*
         * Find what we're supposed to flush (the flushee, as it were).
         */
        req->lr_error = 0;      /* Tflush always succeeds */
        conn = req->lr_conn;
        tp = &conn->lc_tp;
        oldtag = req->lr_req.tflush.oldtag;
        if ((err = ht_wrlock(&conn->lc_requests)) != 0)
                return (err);
        flushee = ht_find_locked(&conn->lc_requests, oldtag);
        if (flushee == NULL) {
                /*
                 * Nothing to flush!  The old request must have
                 * been done and gone already.  Just queue this
                 * Tflush for a success reply.
                 */
                (void) ht_unlock(&conn->lc_requests);
                if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0)
                        return (err);
                goto done;
        }

        /*
         * Found the original request.  We'll need to inspect its
         * work-state to figure out what to do.
         */
        if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) {
                (void) ht_unlock(&conn->lc_requests);
                return (err);
        }
        (void) ht_unlock(&conn->lc_requests);

        switch (flushee->lr_workstate) {

        case L9P_WS_NOTSTARTED:
                /*
                 * Flushee is on work queue, but not yet being
                 * handled by a worker.
                 *
                 * The documentation -- see
                 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
                 * https://swtch.com/plan9port/man/man9/flush.html
                 * -- says that "the server should answer the
                 * flush message immediately".  However, Linux
                 * sends flush requests for operations that
                 * must finish, such as Tclunk, and it's not
                 * possible to *answer* the flush request until
                 * it has been handled (if necessary) or aborted
                 * (if allowed).
                 *
                 * We therefore now just  the original request
                 * and let the request-handler do whatever is
                 * appropriate.  NOTE: we could have a table of
                 * "requests that can be aborted without being
                 * run" vs "requests that must be run to be
                 * aborted", but for now that seems like an
                 * unnecessary complication.
                 */
                nstate = L9P_FLUSH_REQUESTED_PRE_START;
                break;

        case L9P_WS_IMMEDIATE:
                /*
                 * This state only applies to Tflush requests, and
                 * flushing a Tflush is illegal.  But we'll do nothing
                 * special here, which will make us act like a flush
                 * request for the flushee that arrived too late to
                 * do anything about the flushee.
                 */
                nstate = L9P_FLUSH_REQUESTED_POST_START;
                break;

        case L9P_WS_INPROGRESS:
                /*
                 * Worker thread flushee->lr_worker is working on it.
                 * Kick it to get it out of blocking system calls.
                 * (This requires that it carefully set up some
                 * signal handlers, and may be FreeBSD-dependent,
                 * it probably cannot be handled this way on MacOS.)
                 */
#ifdef notyet
                pthread_kill(...);
#endif
                nstate = L9P_FLUSH_REQUESTED_POST_START;
                break;

        case L9P_WS_RESPQUEUED:
                /*
                 * The flushee is already in the response queue.
                 * We'll just mark it as having had some flush
                 * action applied.
                 */
                nstate = L9P_FLUSH_TOOLATE;
                break;

        case L9P_WS_REPLYING:
                /*
                 * Although we found the flushee, it's too late to
                 * make us depend on it: it's already heading out
                 * the door as a reply.
                 *
                 * We don't want to do anything to the flushee.
                 * Instead, we want to work the same way as if
                 * we had never found the tag.
                 */
                goto done;
        }

        /*
         * Now add us to the list of Tflush-es that are waiting
         * for the flushee (creating the list if needed, i.e., if
         * this is the first Tflush for the flushee).  We (req)
         * will get queued for reply later, when the responder
         * processes the flushee and calls l9p_threadpool_rflush().
         */
        if (flushee->lr_flushstate == L9P_FLUSH_NONE)
                STAILQ_INIT(&flushee->lr_flushq);
        flushee->lr_flushstate = nstate;
        STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);

        (void) pthread_mutex_unlock(&tp->ltp_mtx);

        return (0);

done:
        /*
         * This immediate op is ready to be replied-to now, so just
         * stick it onto the reply queue.
         */
        req->lr_workstate = L9P_WS_RESPQUEUED;
        STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
        (void) pthread_mutex_unlock(&tp->ltp_mtx);
        (void) pthread_cond_signal(&tp->ltp_reply_cv);
        return (0);
}

int
l9p_threadpool_shutdown(struct l9p_threadpool *tp)
{
        struct l9p_worker *worker, *tmp;

        LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
                if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
                        continue;
                worker->ltw_exiting = true;
                if (worker->ltw_responder)
                        (void) pthread_cond_signal(&tp->ltp_reply_cv);
                else
                        (void) pthread_cond_broadcast(&tp->ltp_work_cv);
                (void) pthread_mutex_unlock(&tp->ltp_mtx);
                (void) pthread_join(worker->ltw_thread, NULL);
                LIST_REMOVE(worker, ltw_link);
                free(worker);
        }
        (void) pthread_cond_destroy(&tp->ltp_reply_cv);
        (void) pthread_cond_destroy(&tp->ltp_work_cv);
        (void) pthread_mutex_destroy(&tp->ltp_mtx);

        return (0);
}