root/usr/src/lib/mergeq/mergeq.c
/*
 * This file and its contents are supplied under the terms of the
 * Common Development and Distribution License ("CDDL"), version 1.0.
 * You may only use this file in accordance with the terms of version
 * 1.0 of the CDDL.
 *
 * A full copy of the text of the CDDL should have accompanied this
 * source.  A copy of the CDDL is also available via the Internet at
 * http://www.illumos.org/license/CDDL.
 */

/*
 * Copyright 2015 Joyent, Inc.
 */

/*
 * Merge queue
 *
 * A multi-threaded merging queue.
 *
 * The general constraint of the merge queue is that if a set of items are
 * inserted into the queue in the same order, then no matter how many threads
 * are on the scene, we will always process the items in the same order. The
 * secondary constraint is that to support environments that must be
 * single-threaded, we explicitly *must not* create a thread in the case where
 * the number of requested threads is just one.
 *
 * To that end, we've designed our queue as a circular buffer. We will grow that
 * buffer to contain enough space for all the input items, after which we'll
 * then treat it as a circular buffer.
 *
 * Items will be issued to a processing function two at a time, until there is
 * only one item remaining in the queue, at which point we will be doing any
 * merging work.
 *
 * A given queue has three different entries that we care about tracking:
 *
 * o mq_nproc   - What is the slot of the next item to process for something
 *                looking for work.
 *
 * o mq_next    - What is the slot of the next item that should be inserted into
 *                the queue.
 *
 * o mq_ncommit - What is the slot of the next item that should be committed.
 *
 * When a thread comes and looks for work, we pop entries off of the queue based
 * on the index provided by mq_nproc. At the same time, it also gets the slot
 * that it should place the result in, which is mq_next. However, because we
 * have multiple threads that are operating on the system, we want to make sure
 * that we push things onto the queue in order. We do that by allocating a slot
 * to each task and when it completes, it waits for its slot to be ready based
 * on it being the value of mq_ncommit.
 *
 * In addition, we keep track of the number of items in the queue as well as the
 * number of active workers. There's also a generation count that is used to
 * figure out when the various values might lap one another.
 *
 * The following images show what happens when we have a queue with six items
 * and whose capacity has been shrunk to six, to better fit in the screen.
 *
 *
 * 1) This is the initial configuration of the queue right before any processing
 * is done in the context of mergeq_merge(). Every box has an initial item for
 * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and
 * mq_ncommit will all point at the initial entry. However, the mq_next has
 * already lapped around the array and thus has a generation count of one.
 *
 * The '+' characters indicate which bucket the corresponding value of mq_nproc,
 * mq_ncommit, and mq_nproc.
 *
 *                     +---++---++---++---++---++---+
 *                     | X || X || X || X || X || X |
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)   +
 *     mq_ncommit (g0)   +
 *       mq_nproc (g0)   +
 *
 * 2) This shows the state right as the first thread begins to process an entry.
 * Note in this example we will have two threads processing this queue. Note,
 * mq_ncommit has not advanced. This is because the first thread has started
 * processing entries, but it has not finished, and thus we can't commit it.
 * We've incremented mq_next by one because it has gone ahead and assigned a
 * single entry. We've incremented mq_nproc by two, because we have removed two
 * entries and thus will have another set available.
 *
 *                     +---++---++---++---++---++---+     t1 - slot 0
 *                     |   ||   || X || X || X || X |     t2 - idle
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)        +
 *     mq_ncommit (g0)   +
 *       mq_nproc (g0)             +
 *
 *
 * 3) This shows the state right after the second thread begins to process an
 * entry, note that the first thread has not finished. The changes are very
 * similar to the previous state, we've advanced, mq_nproc and mq_next, but not
 * mq_ncommit.
 *
 *                     +---++---++---++---++---++---+     t1 - slot 0
 *                     |   ||   ||   ||   || X || X |     t2 - slot 1
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)             +
 *     mq_ncommit (g0)   +
 *       mq_nproc (g0)                       +
 *
 * 4) This shows the state after thread one has finished processing an item, but
 * before it does anything else. Note that even if thread two finishes early, it
 * cannot commit its item until thread one finishes. Here 'Y' refers to the
 * result of merging the first two 'X's.
 *
 *                     +---++---++---++---++---++---+     t1 - idle
 *                     | Y ||   ||   ||   || X || X |     t2 - slot 1
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)             +
 *     mq_ncommit (g0)        +
 *       mq_nproc (g0)                       +
 *
 * 5) This shows the state after thread one has begun to process the next round
 * and after thread two has committed, but before it begins processing the next
 * item. Note that mq_nproc has wrapped around and we've bumped its generation
 * counter.
 *
 *                     +---++---++---++---++---++---+     t1 - slot 2
 *                     | Y || Y ||   ||   ||   ||   |     t2 - idle
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)                  +
 *     mq_ncommit (g0)             +
 *       mq_nproc (g0)   +
 *
 * 6) Here, thread two, will take the next two Y values and thread 1 will commit
 * its 'Y'. Thread one now must wait until thread two finishes such that it can
 * do additional work.
 *
 *                     +---++---++---++---++---++---+     t1 - waiting
 *                     |   ||   || Y ||   ||   ||   |     t2 - slot 3
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)                       +
 *     mq_ncommit (g0)                  +
 *       mq_nproc (g0)             +
 *
 * 7) Here, thread two has committed and thread one is about to go process the
 * final entry. The character 'Z' represents the results of merging two 'Y's.
 *
 *                     +---++---++---++---++---++---+     t1 - idle
 *                     |   ||   || Y || Z ||   ||   |     t2 - idle
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)                       +
 *     mq_ncommit (g0)                       +
 *       mq_nproc (g0)             +
 *
 * 8) Here, thread one is processing the final item. Thread two is waiting in
 * mergeq_pop() for enough items to be available. In this case, it will never
 * happen; however, once all threads have finished it will break out.
 *
 *                     +---++---++---++---++---++---+     t1 - slot 4
 *                     |   ||   ||   ||   ||   ||   |     t2 - idle
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)                            +
 *     mq_ncommit (g0)                       +
 *       mq_nproc (g0)                       +
 *
 * 9) This is the final state of the queue, it has a single '*' item which is
 * the final merge result. At this point, both thread one and thread two would
 * stop processing and we'll return the result to the user.
 *
 *                     +---++---++---++---++---++---+     t1 - slot 4
 *                     |   ||   ||   ||   || * ||   |     t2 - idle
 *                     +---++---++---++---++---++---+
 *        mq_next (g1)                            +
 *     mq_ncommit (g0)                       +
 *       mq_nproc (g0)                       +
 *
 *
 * Note, that if at any point in time the processing function fails, then all
 * the merges will quiesce and that error will be propagated back to the user.
 */

#include <strings.h>
#include <sys/debug.h>
#include <thread.h>
#include <synch.h>
#include <errno.h>
#include <limits.h>
#include <stdlib.h>

#include "mergeq.h"

struct mergeq {
        mutex_t mq_lock;        /* Protects items below */
        cond_t  mq_cond;        /* Condition variable */
        void **mq_items;        /* Array of items to process */
        size_t mq_nitems;       /* Number of items in the queue */
        size_t mq_cap;          /* Capacity of the items */
        size_t mq_next;         /* Place to put next entry */
        size_t mq_gnext;        /* Generation for next */
        size_t mq_nproc;        /* Index of next thing to process */
        size_t mq_gnproc;       /* Generation for next proc */
        size_t mq_ncommit;      /* Index of the next thing to commit */
        size_t mq_gncommit;     /* Commit generation */
        uint_t mq_nactthrs;     /* Number of active threads */
        uint_t mq_ndthreads;    /* Desired number of threads */
        thread_t *mq_thrs;      /* Actual threads */
        mergeq_proc_f *mq_func; /* Processing function */
        void *mq_arg;           /* Argument for processing */
        boolean_t mq_working;   /* Are we working on processing */
        boolean_t mq_iserror;   /* Have we encountered an error? */
        int mq_error;
};

#define MERGEQ_DEFAULT_CAP      64

static int
mergeq_error(int err)
{
        errno = err;
        return (MERGEQ_ERROR);
}

void
mergeq_fini(mergeq_t *mqp)
{
        if (mqp == NULL)
                return;

        VERIFY(mqp->mq_working != B_TRUE);

        if (mqp->mq_items != NULL)
                mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
        if (mqp->mq_ndthreads > 0) {
                mergeq_free(mqp->mq_thrs, sizeof (thread_t) *
                    mqp->mq_ndthreads);
        }
        VERIFY0(cond_destroy(&mqp->mq_cond));
        VERIFY0(mutex_destroy(&mqp->mq_lock));
        mergeq_free(mqp, sizeof (mergeq_t));
}

int
mergeq_init(mergeq_t **outp, uint_t nthrs)
{
        int ret;
        mergeq_t *mqp;

        mqp = mergeq_alloc(sizeof (mergeq_t));
        if (mqp == NULL)
                return (mergeq_error(ENOMEM));

        bzero(mqp, sizeof (mergeq_t));
        mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP);
        if (mqp->mq_items == NULL) {
                mergeq_free(mqp, sizeof (mergeq_t));
                return (mergeq_error(ENOMEM));
        }
        bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP);

        mqp->mq_ndthreads = nthrs - 1;
        if (mqp->mq_ndthreads > 0) {
                mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) *
                    mqp->mq_ndthreads);
                if (mqp->mq_thrs == NULL) {
                        mergeq_free(mqp->mq_items, sizeof (void *) *
                            MERGEQ_DEFAULT_CAP);
                        mergeq_free(mqp, sizeof (mergeq_t));
                        return (mergeq_error(ENOMEM));
                }
        }

        if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
            NULL)) != 0) {
                if (mqp->mq_ndthreads > 0) {
                        mergeq_free(mqp->mq_thrs,
                            sizeof (thread_t) * mqp->mq_ndthreads);
                }
                mergeq_free(mqp->mq_items, sizeof (void *) *
                    MERGEQ_DEFAULT_CAP);
                mergeq_free(mqp, sizeof (mergeq_t));
                return (mergeq_error(ret));
        }

        if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) {
                VERIFY0(mutex_destroy(&mqp->mq_lock));
                if (mqp->mq_ndthreads > 0) {
                        mergeq_free(mqp->mq_thrs,
                            sizeof (thread_t) * mqp->mq_ndthreads);
                }
                mergeq_free(mqp->mq_items, sizeof (void *) *
                    MERGEQ_DEFAULT_CAP);
                mergeq_free(mqp, sizeof (mergeq_t));
                return (mergeq_error(ret));
        }

        mqp->mq_cap = MERGEQ_DEFAULT_CAP;
        *outp = mqp;
        return (0);
}

static void
mergeq_reset(mergeq_t *mqp)
{
        VERIFY(MUTEX_HELD(&mqp->mq_lock));
        VERIFY(mqp->mq_working == B_FALSE);
        if (mqp->mq_cap != 0)
                bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
        mqp->mq_nitems = 0;
        mqp->mq_next = 0;
        mqp->mq_gnext = 0;
        mqp->mq_nproc = 0;
        mqp->mq_gnproc = 0;
        mqp->mq_ncommit = 0;
        mqp->mq_gncommit = 0;
        mqp->mq_func = NULL;
        mqp->mq_arg = NULL;
        mqp->mq_iserror = B_FALSE;
        mqp->mq_error = 0;
}

static int
mergeq_grow(mergeq_t *mqp)
{
        size_t ncap;
        void **items;

        VERIFY(MUTEX_HELD(&mqp->mq_lock));
        VERIFY(mqp->mq_working == B_FALSE);

        if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP)
                return (ENOSPC);

        ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP;
        items = mergeq_alloc(ncap * sizeof (void *));
        if (items == NULL)
                return (ENOMEM);

        bzero(items, ncap * sizeof (void *));
        bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *));
        mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *));
        mqp->mq_items = items;
        mqp->mq_cap = ncap;
        return (0);
}

int
mergeq_add(mergeq_t *mqp, void *item)
{
        VERIFY0(mutex_lock(&mqp->mq_lock));
        if (mqp->mq_working == B_TRUE) {
                VERIFY0(mutex_unlock(&mqp->mq_lock));
                return (mergeq_error(ENXIO));
        }

        if (mqp->mq_next == mqp->mq_cap) {
                int ret;

                if ((ret = mergeq_grow(mqp)) != 0) {
                        VERIFY0(mutex_unlock(&mqp->mq_lock));
                        return (mergeq_error(ret));
                }
        }
        mqp->mq_items[mqp->mq_next] = item;
        mqp->mq_next++;
        mqp->mq_nitems++;

        VERIFY0(mutex_unlock(&mqp->mq_lock));
        return (0);
}

static size_t
mergeq_slot(mergeq_t *mqp)
{
        size_t s;

        VERIFY(MUTEX_HELD(&mqp->mq_lock));
        VERIFY(mqp->mq_next < mqp->mq_cap);

        /*
         * This probably should be a cv / wait thing.
         */
        VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap);

        s = mqp->mq_next;
        mqp->mq_next++;
        if (mqp->mq_next == mqp->mq_cap) {
                mqp->mq_next %= mqp->mq_cap;
                mqp->mq_gnext++;
        }

        return (s);
}

/*
 * Internal function to push items onto the queue which is now a circular
 * buffer. This should only be used once we begin working on the queue.
 */
static void
mergeq_push(mergeq_t *mqp, size_t slot, void *item)
{
        VERIFY(MUTEX_HELD(&mqp->mq_lock));
        VERIFY(slot < mqp->mq_cap);

        /*
         * We need to verify that we don't push over something that exists.
         * Based on the design, this should never happen. However, in the face
         * of bugs, anything is possible.
         */
        while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE)
                (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);

        if (mqp->mq_iserror == B_TRUE)
                return;

        mqp->mq_items[slot] = item;
        mqp->mq_nitems++;
        mqp->mq_ncommit++;
        if (mqp->mq_ncommit == mqp->mq_cap) {
                mqp->mq_ncommit %= mqp->mq_cap;
                mqp->mq_gncommit++;
        }
        (void) cond_broadcast(&mqp->mq_cond);
}

static void *
mergeq_pop_one(mergeq_t *mqp)
{
        void *out;

        /*
         * We can't move mq_nproc beyond mq_next if they're on the same
         * generation.
         */
        VERIFY(mqp->mq_gnext != mqp->mq_gnproc ||
            mqp->mq_nproc != mqp->mq_next);

        out = mqp->mq_items[mqp->mq_nproc];

        mqp->mq_items[mqp->mq_nproc] = NULL;
        mqp->mq_nproc++;
        if (mqp->mq_nproc == mqp->mq_cap) {
                mqp->mq_nproc %= mqp->mq_cap;
                mqp->mq_gnproc++;
        }
        mqp->mq_nitems--;

        return (out);
}

/*
 * Pop a set of two entries from the queue. We may not have anything to process
 * at the moment, eg. be waiting for someone to add something. In which case,
 * we'll be sitting and waiting.
 */
static boolean_t
mergeq_pop(mergeq_t *mqp, void **first, void **second)
{
        VERIFY(MUTEX_HELD(&mqp->mq_lock));
        VERIFY(mqp->mq_nproc < mqp->mq_cap);

        while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 &&
            mqp->mq_iserror == B_FALSE)
                (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);

        if (mqp->mq_iserror == B_TRUE)
                return (B_FALSE);

        if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) {
                VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1);
                return (B_FALSE);
        }
        VERIFY(mqp->mq_nitems >= 2);

        *first = mergeq_pop_one(mqp);
        *second = mergeq_pop_one(mqp);

        return (B_TRUE);
}

static void *
mergeq_thr_merge(void *arg)
{
        mergeq_t *mqp = arg;

        VERIFY0(mutex_lock(&mqp->mq_lock));

        /*
         * Check to make sure creation worked and if not, fail fast.
         */
        if (mqp->mq_iserror == B_TRUE) {
                VERIFY0(mutex_unlock(&mqp->mq_lock));
                return (NULL);
        }

        for (;;) {
                void *first, *second, *out;
                int ret;
                size_t slot;

                if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) {
                        VERIFY0(mutex_unlock(&mqp->mq_lock));
                        return (NULL);
                }

                if (mergeq_pop(mqp, &first, &second) == B_FALSE) {
                        VERIFY0(mutex_unlock(&mqp->mq_lock));
                        return (NULL);
                }
                slot = mergeq_slot(mqp);

                mqp->mq_nactthrs++;

                VERIFY0(mutex_unlock(&mqp->mq_lock));
                ret = mqp->mq_func(first, second, &out, mqp->mq_arg);
                VERIFY0(mutex_lock(&mqp->mq_lock));

                if (ret != 0) {
                        if (mqp->mq_iserror == B_FALSE) {
                                mqp->mq_iserror = B_TRUE;
                                mqp->mq_error = ret;
                                (void) cond_broadcast(&mqp->mq_cond);
                        }
                        mqp->mq_nactthrs--;
                        VERIFY0(mutex_unlock(&mqp->mq_lock));
                        return (NULL);
                }
                mergeq_push(mqp, slot, out);
                mqp->mq_nactthrs--;
        }
}

int
mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp,
    int *errp)
{
        int ret, i;
        boolean_t seterr = B_FALSE;

        if (mqp == NULL || func == NULL || outp == NULL) {
                return (mergeq_error(EINVAL));
        }

        VERIFY0(mutex_lock(&mqp->mq_lock));
        if (mqp->mq_working == B_TRUE) {
                VERIFY0(mutex_unlock(&mqp->mq_lock));
                return (mergeq_error(EBUSY));
        }

        if (mqp->mq_nitems == 0) {
                *outp = NULL;
                mergeq_reset(mqp);
                VERIFY0(mutex_unlock(&mqp->mq_lock));
                return (0);
        }

        /*
         * Now that we've finished adding items to the queue, turn it into a
         * circular buffer.
         */
        mqp->mq_func = func;
        mqp->mq_arg = arg;
        mqp->mq_nproc = 0;
        mqp->mq_working = B_TRUE;
        if (mqp->mq_next == mqp->mq_cap) {
                mqp->mq_next %= mqp->mq_cap;
                mqp->mq_gnext++;
        }
        mqp->mq_ncommit = mqp->mq_next;

        ret = 0;
        for (i = 0; i < mqp->mq_ndthreads; i++) {
                ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0,
                    &mqp->mq_thrs[i]);
                if (ret != 0) {
                        mqp->mq_iserror = B_TRUE;
                        break;
                }
        }

        VERIFY0(mutex_unlock(&mqp->mq_lock));
        if (ret == 0)
                (void) mergeq_thr_merge(mqp);

        for (i = 0; i < mqp->mq_ndthreads; i++) {
                VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL));
        }

        VERIFY0(mutex_lock(&mqp->mq_lock));

        VERIFY(mqp->mq_nactthrs == 0);
        mqp->mq_working = B_FALSE;
        if (ret == 0 && mqp->mq_iserror == B_FALSE) {
                VERIFY(mqp->mq_nitems == 1);
                *outp = mergeq_pop_one(mqp);
        } else if (ret == 0 && mqp->mq_iserror == B_TRUE) {
                ret = MERGEQ_UERROR;
                if (errp != NULL)
                        *errp = mqp->mq_error;
        } else {
                seterr = B_TRUE;
        }

        mergeq_reset(mqp);
        VERIFY0(mutex_unlock(&mqp->mq_lock));

        if (seterr == B_TRUE)
                return (mergeq_error(ret));

        return (ret);
}