#include <strings.h>
#include <sys/debug.h>
#include <thread.h>
#include <synch.h>
#include <errno.h>
#include <limits.h>
#include <stdlib.h>
#include "workq.h"
struct workq {
mutex_t wq_lock;
cond_t wq_cond;
void **wq_items;
size_t wq_nitems;
size_t wq_cap;
size_t wq_next;
uint_t wq_ndthreads;
thread_t *wq_thrs;
workq_proc_f *wq_func;
void *wq_arg;
boolean_t wq_working;
boolean_t wq_iserror;
int wq_error;
};
#define WORKQ_DEFAULT_CAP 64
static int
workq_error(int err)
{
VERIFY(err != 0);
errno = err;
return (WORKQ_ERROR);
}
void
workq_fini(workq_t *wqp)
{
if (wqp == NULL)
return;
VERIFY(wqp->wq_working != B_TRUE);
VERIFY0(mutex_destroy(&wqp->wq_lock));
VERIFY0(cond_destroy(&wqp->wq_cond));
if (wqp->wq_cap > 0)
workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
if (wqp->wq_ndthreads > 0)
workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads);
workq_free(wqp, sizeof (workq_t));
}
int
workq_init(workq_t **outp, uint_t nthrs)
{
int ret;
workq_t *wqp;
wqp = workq_alloc(sizeof (workq_t));
if (wqp == NULL)
return (workq_error(ENOMEM));
bzero(wqp, sizeof (workq_t));
wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP);
if (wqp->wq_items == NULL) {
workq_free(wqp, sizeof (workq_t));
return (workq_error(ENOMEM));
}
bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
wqp->wq_ndthreads = nthrs - 1;
if (wqp->wq_ndthreads > 0) {
wqp->wq_thrs = workq_alloc(sizeof (thread_t) *
wqp->wq_ndthreads);
if (wqp->wq_thrs == NULL) {
workq_free(wqp->wq_items, sizeof (void *) *
WORKQ_DEFAULT_CAP);
workq_free(wqp, sizeof (workq_t));
return (workq_error(ENOMEM));
}
}
if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
NULL)) != 0) {
if (wqp->wq_ndthreads > 0) {
workq_free(wqp->wq_thrs,
sizeof (thread_t) * wqp->wq_ndthreads);
}
workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
workq_free(wqp, sizeof (workq_t));
return (workq_error(ret));
}
if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) {
VERIFY0(mutex_destroy(&wqp->wq_lock));
if (wqp->wq_ndthreads > 0) {
workq_free(wqp->wq_thrs,
sizeof (thread_t) * wqp->wq_ndthreads);
}
workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
workq_free(wqp, sizeof (workq_t));
return (workq_error(ret));
}
wqp->wq_cap = WORKQ_DEFAULT_CAP;
*outp = wqp;
return (0);
}
static void
workq_reset(workq_t *wqp)
{
VERIFY(MUTEX_HELD(&wqp->wq_lock));
VERIFY(wqp->wq_working == B_FALSE);
if (wqp->wq_cap > 0)
bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
wqp->wq_nitems = 0;
wqp->wq_next = 0;
wqp->wq_func = NULL;
wqp->wq_arg = NULL;
wqp->wq_iserror = B_FALSE;
wqp->wq_error = 0;
}
static int
workq_grow(workq_t *wqp)
{
size_t ncap;
void **items;
VERIFY(MUTEX_HELD(&wqp->wq_lock));
VERIFY(wqp->wq_working == B_FALSE);
if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP)
return (ENOSPC);
ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP;
items = workq_alloc(ncap * sizeof (void *));
if (items == NULL)
return (ENOMEM);
bzero(items, ncap * sizeof (void *));
bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *));
workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
wqp->wq_items = items;
wqp->wq_cap = ncap;
return (0);
}
int
workq_add(workq_t *wqp, void *item)
{
VERIFY0(mutex_lock(&wqp->wq_lock));
if (wqp->wq_working == B_TRUE) {
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (workq_error(ENXIO));
}
if (wqp->wq_nitems == wqp->wq_cap) {
int ret;
if ((ret = workq_grow(wqp)) != 0) {
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (workq_error(ret));
}
}
wqp->wq_items[wqp->wq_nitems] = item;
wqp->wq_nitems++;
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (0);
}
static void *
workq_pop(workq_t *wqp)
{
void *out;
VERIFY(MUTEX_HELD(&wqp->wq_lock));
VERIFY(wqp->wq_next < wqp->wq_nitems);
out = wqp->wq_items[wqp->wq_next];
wqp->wq_items[wqp->wq_next] = NULL;
wqp->wq_next++;
return (out);
}
static void *
workq_thr_work(void *arg)
{
workq_t *wqp = arg;
VERIFY0(mutex_lock(&wqp->wq_lock));
VERIFY(wqp->wq_working == B_TRUE);
for (;;) {
int ret;
void *item;
if (wqp->wq_iserror == B_TRUE ||
wqp->wq_next == wqp->wq_nitems) {
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (NULL);
}
item = workq_pop(wqp);
VERIFY0(mutex_unlock(&wqp->wq_lock));
ret = wqp->wq_func(item, wqp->wq_arg);
VERIFY0(mutex_lock(&wqp->wq_lock));
if (ret != 0) {
if (wqp->wq_iserror == B_FALSE) {
wqp->wq_iserror = B_TRUE;
wqp->wq_error = ret;
}
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (NULL);
}
}
}
int
workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp)
{
int i, ret;
boolean_t seterr = B_FALSE;
if (wqp == NULL || func == NULL)
return (workq_error(EINVAL));
VERIFY0(mutex_lock(&wqp->wq_lock));
if (wqp->wq_working == B_TRUE) {
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (workq_error(EBUSY));
}
if (wqp->wq_nitems == 0) {
workq_reset(wqp);
VERIFY0(mutex_unlock(&wqp->wq_lock));
return (0);
}
wqp->wq_func = func;
wqp->wq_arg = arg;
wqp->wq_next = 0;
wqp->wq_working = B_TRUE;
ret = 0;
for (i = 0; i < wqp->wq_ndthreads; i++) {
ret = thr_create(NULL, 0, workq_thr_work, wqp, 0,
&wqp->wq_thrs[i]);
if (ret != 0) {
wqp->wq_iserror = B_TRUE;
}
}
VERIFY0(mutex_unlock(&wqp->wq_lock));
if (ret == 0)
(void) workq_thr_work(wqp);
for (i = 0; i < wqp->wq_ndthreads; i++) {
VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL));
}
VERIFY0(mutex_lock(&wqp->wq_lock));
wqp->wq_working = B_FALSE;
if (ret == 0 && wqp->wq_iserror == B_TRUE) {
ret = WORKQ_UERROR;
if (errp != NULL)
*errp = wqp->wq_error;
} else if (ret != 0) {
VERIFY(wqp->wq_iserror == B_FALSE);
seterr = B_TRUE;
}
workq_reset(wqp);
VERIFY0(mutex_unlock(&wqp->wq_lock));
if (seterr == B_TRUE)
return (workq_error(ret));
return (ret);
}