#include <thread.h>
#include <synch.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/debug.h>
#include <sys/sysmacros.h>
#include "libzfs_taskq.h"
#define ZFS_TASKQ_ACTIVE 0x00010000
#define ZFS_TASKQ_NAMELEN 31
typedef struct zfs_taskq_ent {
struct zfs_taskq_ent *ztqent_next;
struct zfs_taskq_ent *ztqent_prev;
ztask_func_t *ztqent_func;
void *ztqent_arg;
uintptr_t ztqent_flags;
} zfs_taskq_ent_t;
struct zfs_taskq {
char ztq_name[ZFS_TASKQ_NAMELEN + 1];
mutex_t ztq_lock;
rwlock_t ztq_threadlock;
cond_t ztq_dispatch_cv;
cond_t ztq_wait_cv;
thread_t *ztq_threadlist;
int ztq_flags;
int ztq_active;
int ztq_nthreads;
int ztq_nalloc;
int ztq_minalloc;
int ztq_maxalloc;
cond_t ztq_maxalloc_cv;
int ztq_maxalloc_wait;
zfs_taskq_ent_t *ztq_freelist;
zfs_taskq_ent_t ztq_task;
};
static zfs_taskq_ent_t *
ztask_alloc(zfs_taskq_t *ztq, int ztqflags)
{
zfs_taskq_ent_t *t;
timestruc_t ts;
int err;
again: if ((t = ztq->ztq_freelist) != NULL &&
ztq->ztq_nalloc >= ztq->ztq_minalloc) {
ztq->ztq_freelist = t->ztqent_next;
} else {
if (ztq->ztq_nalloc >= ztq->ztq_maxalloc) {
if (!(ztqflags & UMEM_NOFAIL))
return (NULL);
ztq->ztq_maxalloc_wait++;
ts.tv_sec = 1;
ts.tv_nsec = 0;
err = cond_reltimedwait(&ztq->ztq_maxalloc_cv,
&ztq->ztq_lock, &ts);
ztq->ztq_maxalloc_wait--;
if (err == 0)
goto again;
}
mutex_exit(&ztq->ztq_lock);
t = umem_alloc(sizeof (zfs_taskq_ent_t), ztqflags);
mutex_enter(&ztq->ztq_lock);
if (t != NULL)
ztq->ztq_nalloc++;
}
return (t);
}
static void
ztask_free(zfs_taskq_t *ztq, zfs_taskq_ent_t *t)
{
if (ztq->ztq_nalloc <= ztq->ztq_minalloc) {
t->ztqent_next = ztq->ztq_freelist;
ztq->ztq_freelist = t;
} else {
ztq->ztq_nalloc--;
mutex_exit(&ztq->ztq_lock);
umem_free(t, sizeof (zfs_taskq_ent_t));
mutex_enter(&ztq->ztq_lock);
}
if (ztq->ztq_maxalloc_wait)
VERIFY0(cond_signal(&ztq->ztq_maxalloc_cv));
}
zfs_taskqid_t
zfs_taskq_dispatch(zfs_taskq_t *ztq, ztask_func_t func, void *arg,
uint_t ztqflags)
{
zfs_taskq_ent_t *t;
mutex_enter(&ztq->ztq_lock);
ASSERT(ztq->ztq_flags & ZFS_TASKQ_ACTIVE);
if ((t = ztask_alloc(ztq, ztqflags)) == NULL) {
mutex_exit(&ztq->ztq_lock);
return (0);
}
if (ztqflags & ZFS_TQ_FRONT) {
t->ztqent_next = ztq->ztq_task.ztqent_next;
t->ztqent_prev = &ztq->ztq_task;
} else {
t->ztqent_next = &ztq->ztq_task;
t->ztqent_prev = ztq->ztq_task.ztqent_prev;
}
t->ztqent_next->ztqent_prev = t;
t->ztqent_prev->ztqent_next = t;
t->ztqent_func = func;
t->ztqent_arg = arg;
t->ztqent_flags = 0;
VERIFY0(cond_signal(&ztq->ztq_dispatch_cv));
mutex_exit(&ztq->ztq_lock);
return (1);
}
void
zfs_taskq_wait(zfs_taskq_t *ztq)
{
mutex_enter(&ztq->ztq_lock);
while (ztq->ztq_task.ztqent_next != &ztq->ztq_task ||
ztq->ztq_active != 0) {
int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock);
VERIFY(ret == 0 || ret == EINTR);
}
mutex_exit(&ztq->ztq_lock);
}
static void *
zfs_taskq_thread(void *arg)
{
zfs_taskq_t *ztq = arg;
zfs_taskq_ent_t *t;
boolean_t prealloc;
mutex_enter(&ztq->ztq_lock);
while (ztq->ztq_flags & ZFS_TASKQ_ACTIVE) {
if ((t = ztq->ztq_task.ztqent_next) == &ztq->ztq_task) {
int ret;
if (--ztq->ztq_active == 0)
VERIFY0(cond_broadcast(&ztq->ztq_wait_cv));
ret = cond_wait(&ztq->ztq_dispatch_cv, &ztq->ztq_lock);
VERIFY(ret == 0 || ret == EINTR);
ztq->ztq_active++;
continue;
}
t->ztqent_prev->ztqent_next = t->ztqent_next;
t->ztqent_next->ztqent_prev = t->ztqent_prev;
t->ztqent_next = NULL;
t->ztqent_prev = NULL;
prealloc = t->ztqent_flags & ZFS_TQENT_FLAG_PREALLOC;
mutex_exit(&ztq->ztq_lock);
VERIFY0(rw_rdlock(&ztq->ztq_threadlock));
t->ztqent_func(t->ztqent_arg);
VERIFY0(rw_unlock(&ztq->ztq_threadlock));
mutex_enter(&ztq->ztq_lock);
if (!prealloc)
ztask_free(ztq, t);
}
ztq->ztq_nthreads--;
VERIFY0(cond_broadcast(&ztq->ztq_wait_cv));
mutex_exit(&ztq->ztq_lock);
return (NULL);
}
zfs_taskq_t *
zfs_taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
int maxalloc, uint_t flags)
{
zfs_taskq_t *ztq = umem_zalloc(sizeof (zfs_taskq_t), UMEM_NOFAIL);
int t;
ASSERT3S(nthreads, >=, 1);
VERIFY0(rwlock_init(&ztq->ztq_threadlock, USYNC_THREAD, NULL));
VERIFY0(cond_init(&ztq->ztq_dispatch_cv, USYNC_THREAD, NULL));
VERIFY0(cond_init(&ztq->ztq_wait_cv, USYNC_THREAD, NULL));
VERIFY0(cond_init(&ztq->ztq_maxalloc_cv, USYNC_THREAD, NULL));
VERIFY0(mutex_init(
&ztq->ztq_lock, LOCK_NORMAL | LOCK_ERRORCHECK, NULL));
(void) strncpy(ztq->ztq_name, name, ZFS_TASKQ_NAMELEN + 1);
ztq->ztq_flags = flags | ZFS_TASKQ_ACTIVE;
ztq->ztq_active = nthreads;
ztq->ztq_nthreads = nthreads;
ztq->ztq_minalloc = minalloc;
ztq->ztq_maxalloc = maxalloc;
ztq->ztq_task.ztqent_next = &ztq->ztq_task;
ztq->ztq_task.ztqent_prev = &ztq->ztq_task;
ztq->ztq_threadlist =
umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL);
if (flags & ZFS_TASKQ_PREPOPULATE) {
mutex_enter(&ztq->ztq_lock);
while (minalloc-- > 0)
ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL));
mutex_exit(&ztq->ztq_lock);
}
for (t = 0; t < nthreads; t++) {
(void) thr_create(0, 0, zfs_taskq_thread,
ztq, THR_BOUND, &ztq->ztq_threadlist[t]);
}
return (ztq);
}
void
zfs_taskq_destroy(zfs_taskq_t *ztq)
{
int t;
int nthreads = ztq->ztq_nthreads;
zfs_taskq_wait(ztq);
mutex_enter(&ztq->ztq_lock);
ztq->ztq_flags &= ~ZFS_TASKQ_ACTIVE;
VERIFY0(cond_broadcast(&ztq->ztq_dispatch_cv));
while (ztq->ztq_nthreads != 0) {
int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock);
VERIFY(ret == 0 || ret == EINTR);
}
ztq->ztq_minalloc = 0;
while (ztq->ztq_nalloc != 0) {
ASSERT(ztq->ztq_freelist != NULL);
ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL));
}
mutex_exit(&ztq->ztq_lock);
for (t = 0; t < nthreads; t++)
(void) thr_join(ztq->ztq_threadlist[t], NULL, NULL);
umem_free(ztq->ztq_threadlist, nthreads * sizeof (thread_t));
VERIFY0(rwlock_destroy(&ztq->ztq_threadlock));
VERIFY0(cond_destroy(&ztq->ztq_dispatch_cv));
VERIFY0(cond_destroy(&ztq->ztq_wait_cv));
VERIFY0(cond_destroy(&ztq->ztq_maxalloc_cv));
VERIFY0(mutex_destroy(&ztq->ztq_lock));
umem_free(ztq, sizeof (zfs_taskq_t));
}