#include <errno.h>
#include <stdlib.h>
#include <pthread.h>
#if defined(__FreeBSD__)
#include <pthread_np.h>
#endif
#include <sys/queue.h>
#include "lib9p.h"
#include "threadpool.h"
static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
struct l9p_request *req);
static void *
l9p_responder(void *arg)
{
struct l9p_threadpool *tp;
struct l9p_worker *worker = arg;
struct l9p_request *req;
tp = worker->ltw_tp;
for (;;) {
if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
break;
while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) {
(void) pthread_cond_wait(&tp->ltp_reply_cv,
&tp->ltp_mtx);
}
if (worker->ltw_exiting) {
(void) pthread_mutex_unlock(&tp->ltp_mtx);
break;
}
req = STAILQ_FIRST(&tp->ltp_replyq);
STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
req->lr_workstate = L9P_WS_REPLYING;
if (req->lr_flushstate != L9P_FLUSH_NONE)
l9p_threadpool_rflush(tp, req);
if (pthread_mutex_unlock(&tp->ltp_mtx) != 0)
break;
l9p_respond(req, false, true);
}
return (NULL);
}
static void *
l9p_worker(void *arg)
{
struct l9p_threadpool *tp;
struct l9p_worker *worker = arg;
struct l9p_request *req;
tp = worker->ltw_tp;
if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
return (NULL);
for (;;) {
while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) {
(void) pthread_cond_wait(&tp->ltp_work_cv,
&tp->ltp_mtx);
}
if (worker->ltw_exiting)
break;
req = STAILQ_FIRST(&tp->ltp_workq);
STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
req->lr_workstate = L9P_WS_INPROGRESS;
req->lr_worker = worker;
(void) pthread_mutex_unlock(&tp->ltp_mtx);
req->lr_error = l9p_dispatch_request(req);
if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
return (NULL);
req->lr_workstate = L9P_WS_RESPQUEUED;
req->lr_worker = NULL;
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
(void) pthread_cond_signal(&tp->ltp_reply_cv);
}
(void) pthread_mutex_unlock(&tp->ltp_mtx);
return (NULL);
}
static void
l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
{
struct l9p_request *flusher;
STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
flusher->lr_workstate = L9P_WS_RESPQUEUED;
#ifdef notdef
if (not the last) {
flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
}
#endif
STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
}
}
int
l9p_threadpool_init(struct l9p_threadpool *tp, int size)
{
struct l9p_worker *worker;
#if defined(__FreeBSD__)
char threadname[16];
#endif
int error;
int i, nworkers, nresponders;
if (size <= 0)
return (EINVAL);
#ifdef __illumos__
pthread_mutexattr_t attr;
if ((error = pthread_mutexattr_init(&attr)) != 0)
return (error);
if ((error = pthread_mutexattr_settype(&attr,
PTHREAD_MUTEX_ERRORCHECK)) != 0) {
return (error);
}
error = pthread_mutex_init(&tp->ltp_mtx, &attr);
#else
error = pthread_mutex_init(&tp->ltp_mtx, NULL);
#endif
if (error)
return (error);
error = pthread_cond_init(&tp->ltp_work_cv, NULL);
if (error)
goto fail_work_cv;
error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
if (error)
goto fail_reply_cv;
STAILQ_INIT(&tp->ltp_workq);
STAILQ_INIT(&tp->ltp_replyq);
LIST_INIT(&tp->ltp_workers);
nresponders = 0;
nworkers = 0;
for (i = 0; i <= size; i++) {
worker = calloc(1, sizeof(struct l9p_worker));
#ifdef __illumos__
if (worker == NULL)
break;
#endif
worker->ltw_tp = tp;
worker->ltw_responder = i == 0;
error = pthread_create(&worker->ltw_thread, NULL,
worker->ltw_responder ? l9p_responder : l9p_worker,
(void *)worker);
if (error) {
free(worker);
break;
}
if (worker->ltw_responder)
nresponders++;
else
nworkers++;
#if defined(__FreeBSD__)
if (worker->ltw_responder) {
pthread_set_name_np(worker->ltw_thread, "9p-responder");
} else {
sprintf(threadname, "9p-worker:%d", i - 1);
pthread_set_name_np(worker->ltw_thread, threadname);
}
#elif defined(__illumos__)
if (worker->ltw_responder) {
(void) pthread_setname_np(worker->ltw_thread,
"9p-responder");
} else {
char threadname[PTHREAD_MAX_NAMELEN_NP];
(void) snprintf(threadname, sizeof (threadname),
"9p-worker:%d", i - 1);
(void) pthread_setname_np(worker->ltw_thread,
threadname);
}
#endif
LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
}
if (nresponders == 0 || nworkers == 0) {
l9p_threadpool_shutdown(tp);
return (error);
}
return (0);
fail_reply_cv:
(void) pthread_cond_destroy(&tp->ltp_work_cv);
fail_work_cv:
(void) pthread_mutex_destroy(&tp->ltp_mtx);
return (error);
}
void
l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
{
if (req->lr_req.hdr.type == L9P_TFLUSH) {
req->lr_workstate = L9P_WS_IMMEDIATE;
(void) l9p_dispatch_request(req);
} else {
if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
return;
req->lr_workstate = L9P_WS_NOTSTARTED;
STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
(void) pthread_cond_signal(&tp->ltp_work_cv);
(void) pthread_mutex_unlock(&tp->ltp_mtx);
}
}
int
l9p_threadpool_tflush(struct l9p_request *req)
{
struct l9p_connection *conn;
struct l9p_threadpool *tp;
struct l9p_request *flushee;
uint16_t oldtag;
enum l9p_flushstate nstate = L9P_FLUSH_NONE;
int err;
req->lr_error = 0;
conn = req->lr_conn;
tp = &conn->lc_tp;
oldtag = req->lr_req.tflush.oldtag;
if ((err = ht_wrlock(&conn->lc_requests)) != 0)
return (err);
flushee = ht_find_locked(&conn->lc_requests, oldtag);
if (flushee == NULL) {
(void) ht_unlock(&conn->lc_requests);
if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0)
return (err);
goto done;
}
if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) {
(void) ht_unlock(&conn->lc_requests);
return (err);
}
(void) ht_unlock(&conn->lc_requests);
switch (flushee->lr_workstate) {
case L9P_WS_NOTSTARTED:
nstate = L9P_FLUSH_REQUESTED_PRE_START;
break;
case L9P_WS_IMMEDIATE:
nstate = L9P_FLUSH_REQUESTED_POST_START;
break;
case L9P_WS_INPROGRESS:
#ifdef notyet
pthread_kill(...);
#endif
nstate = L9P_FLUSH_REQUESTED_POST_START;
break;
case L9P_WS_RESPQUEUED:
nstate = L9P_FLUSH_TOOLATE;
break;
case L9P_WS_REPLYING:
goto done;
}
if (flushee->lr_flushstate == L9P_FLUSH_NONE)
STAILQ_INIT(&flushee->lr_flushq);
flushee->lr_flushstate = nstate;
STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
(void) pthread_mutex_unlock(&tp->ltp_mtx);
return (0);
done:
req->lr_workstate = L9P_WS_RESPQUEUED;
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
(void) pthread_mutex_unlock(&tp->ltp_mtx);
(void) pthread_cond_signal(&tp->ltp_reply_cv);
return (0);
}
int
l9p_threadpool_shutdown(struct l9p_threadpool *tp)
{
struct l9p_worker *worker, *tmp;
LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
continue;
worker->ltw_exiting = true;
if (worker->ltw_responder)
(void) pthread_cond_signal(&tp->ltp_reply_cv);
else
(void) pthread_cond_broadcast(&tp->ltp_work_cv);
(void) pthread_mutex_unlock(&tp->ltp_mtx);
(void) pthread_join(worker->ltw_thread, NULL);
LIST_REMOVE(worker, ltw_link);
free(worker);
}
(void) pthread_cond_destroy(&tp->ltp_reply_cv);
(void) pthread_cond_destroy(&tp->ltp_work_cv);
(void) pthread_mutex_destroy(&tp->ltp_mtx);
return (0);
}