#include "lint.h"
#include "thr_uberdata.h"
#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <thread.h>
#include <pthread.h>
#include <synch.h>
#include <port.h>
#include <signal.h>
#include <stdio.h>
#include <errno.h>
#include <stdarg.h>
#include <string.h>
#include <sys/aiocb.h>
#include <time.h>
#include <signal.h>
#include <fcntl.h>
#include "sigev_thread.h"
thread_communication_data_t *sigev_aio_tcd = NULL;
static int _rt_debug = 0;
void
init_sigev_thread(void)
{
char *ldebug;
if ((ldebug = getenv("_RT_DEBUG")) != NULL)
_rt_debug = atoi(ldebug);
}
static void
_rt_dprintf(const char *format, ...)
{
if (_rt_debug) {
va_list alist;
va_start(alist, format);
flockfile(stderr);
pthread_cleanup_push(funlockfile, stderr);
(void) fputs("DEBUG: ", stderr);
(void) vfprintf(stderr, format, alist);
pthread_cleanup_pop(1);
va_end(alist);
}
}
static void *
notify_thread(void *arg)
{
sigev_thread_data_t *stdp = arg;
void (*function)(union sigval) = stdp->std_func;
union sigval argument = stdp->std_arg;
lfree(stdp, sizeof (*stdp));
function(argument);
return (NULL);
}
static void
notifier(void *arg)
{
(void) notify_thread(arg);
}
static int
sigev_add_work(thread_communication_data_t *tcdp,
void (*function)(union sigval), union sigval argument)
{
tpool_t *tpool = tcdp->tcd_poolp;
sigev_thread_data_t *stdp;
if (tpool == NULL)
return (EINVAL);
if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
return (errno);
stdp->std_func = function;
stdp->std_arg = argument;
if (tpool_dispatch(tpool, notifier, stdp) != 0) {
lfree(stdp, sizeof (*stdp));
return (errno);
}
return (0);
}
static void
sigev_destroy_pool(thread_communication_data_t *tcdp)
{
if (tcdp->tcd_poolp != NULL)
tpool_abandon(tcdp->tcd_poolp);
tcdp->tcd_poolp = NULL;
if (tcdp->tcd_subsystem == MQ) {
sig_mutex_lock(&tcdp->tcd_lock);
tcdp->tcd_server_id = 0;
if (tcdp->tcd_msg_closing) {
(void) cond_broadcast(&tcdp->tcd_cv);
sig_mutex_unlock(&tcdp->tcd_lock);
return;
}
sig_mutex_unlock(&tcdp->tcd_lock);
}
free_sigev_handler(tcdp);
}
void *
timer_spawner(void *arg)
{
thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
port_event_t port_event;
pthread_cleanup_push(sigev_destroy_pool, tcdp);
for (;;) {
if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
_rt_dprintf("port_get on port %d failed with %d <%s>\n",
tcdp->tcd_port, errno, strerror(errno));
break;
}
switch (port_event.portev_source) {
case PORT_SOURCE_TIMER:
break;
case PORT_SOURCE_ALERT:
if (port_event.portev_events != SIGEV_THREAD_TERM)
errno = EPROTO;
goto out;
default:
_rt_dprintf("port_get on port %d returned %u "
"(not PORT_SOURCE_TIMER)\n",
tcdp->tcd_port, port_event.portev_source);
errno = EPROTO;
goto out;
}
tcdp->tcd_overruns = port_event.portev_events - 1;
if (sigev_add_work(tcdp,
tcdp->tcd_notif.sigev_notify_function,
tcdp->tcd_notif.sigev_value) != 0)
break;
tpool_wait(tcdp->tcd_poolp);
}
out:
pthread_cleanup_pop(1);
return (NULL);
}
void *
mqueue_spawner(void *arg)
{
thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
int ret = 0;
int ntype;
void (*function)(union sigval);
union sigval argument;
pthread_cleanup_push(sigev_destroy_pool, tcdp);
while (ret == 0) {
sig_mutex_lock(&tcdp->tcd_lock);
pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
while ((ntype = tcdp->tcd_msg_enabled) == 0)
(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
pthread_cleanup_pop(1);
while (sem_wait(tcdp->tcd_msg_avail) == -1)
continue;
sig_mutex_lock(&tcdp->tcd_lock);
tcdp->tcd_msg_enabled = 0;
sig_mutex_unlock(&tcdp->tcd_lock);
if (ntype == SIGEV_THREAD) {
function = tcdp->tcd_notif.sigev_notify_function;
argument.sival_ptr = tcdp->tcd_msg_userval;
ret = sigev_add_work(tcdp, function, argument);
} else {
ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
0, (uintptr_t)tcdp->tcd_msg_object,
tcdp->tcd_msg_userval);
}
}
sig_mutex_unlock(&tcdp->tcd_lock);
pthread_cleanup_pop(1);
return (NULL);
}
void *
aio_spawner(void *arg)
{
thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
int error = 0;
void (*function)(union sigval);
union sigval argument;
port_event_t port_event;
struct sigevent *sigevp;
timespec_t delta;
pthread_attr_t *attrp;
pthread_cleanup_push(sigev_destroy_pool, tcdp);
while (error == 0) {
if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
error = errno;
_rt_dprintf("port_get on port %d failed with %d <%s>\n",
tcdp->tcd_port, error, strerror(error));
break;
}
switch (port_event.portev_source) {
case PORT_SOURCE_AIO:
break;
case PORT_SOURCE_ALERT:
if (port_event.portev_events != SIGEV_THREAD_TERM)
errno = EPROTO;
goto out;
default:
_rt_dprintf("port_get on port %d returned %u "
"(not PORT_SOURCE_AIO)\n",
tcdp->tcd_port, port_event.portev_source);
errno = EPROTO;
goto out;
}
argument.sival_ptr = port_event.portev_user;
switch (port_event.portev_events) {
case AIOLIO:
#if !defined(_LP64)
case AIOLIO64:
#endif
sigevp = (struct sigevent *)port_event.portev_object;
function = sigevp->sigev_notify_function;
attrp = sigevp->sigev_notify_attributes;
break;
case AIOAREAD:
case AIOAWRITE:
case AIOFSYNC:
{
aiocb_t *aiocbp =
(aiocb_t *)port_event.portev_object;
function = aiocbp->aio_sigevent.sigev_notify_function;
attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
break;
}
#if !defined(_LP64)
case AIOAREAD64:
case AIOAWRITE64:
case AIOFSYNC64:
{
aiocb64_t *aiocbp =
(aiocb64_t *)port_event.portev_object;
function = aiocbp->aio_sigevent.sigev_notify_function;
attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
break;
}
#endif
default:
function = NULL;
attrp = NULL;
break;
}
if (function == NULL)
error = EINVAL;
else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
error = sigev_add_work(tcdp, function, argument);
else {
pthread_attr_t local_attr;
sigev_thread_data_t *stdp;
if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
error = ENOMEM;
else
error = pthread_attr_clone(&local_attr, attrp);
if (error == 0) {
(void) pthread_attr_setdetachstate(
&local_attr, PTHREAD_CREATE_DETACHED);
(void) pthread_attr_setdaemonstate_np(
&local_attr, PTHREAD_CREATE_DAEMON_NP);
stdp->std_func = function;
stdp->std_arg = argument;
error = pthread_create(NULL, &local_attr,
notify_thread, stdp);
(void) pthread_attr_destroy(&local_attr);
}
if (error && stdp != NULL)
lfree(stdp, sizeof (*stdp));
}
if (error) {
_rt_dprintf("Cannot add work, error=%d <%s>.\n",
error, strerror(error));
if (error == EAGAIN || error == ENOMEM) {
if (_port_dispatch(tcdp->tcd_port, 0,
PORT_SOURCE_AIO, port_event.portev_events,
port_event.portev_object,
port_event.portev_user) != 0)
break;
error = 0;
delta.tv_sec = 0;
delta.tv_nsec = NANOSEC / 20;
(void) nanosleep(&delta, NULL);
}
}
}
out:
pthread_cleanup_pop(1);
return (NULL);
}
static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)
{
thread_communication_data_t *tcdp;
if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
tcdp->tcd_subsystem = caller;
tcdp->tcd_port = -1;
(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
}
return (tcdp);
}
void
free_sigev_handler(thread_communication_data_t *tcdp)
{
if (tcdp->tcd_attrp) {
(void) pthread_attr_destroy(tcdp->tcd_attrp);
tcdp->tcd_attrp = NULL;
}
(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
switch (tcdp->tcd_subsystem) {
case TIMER:
case AIO:
if (tcdp->tcd_port >= 0)
(void) close(tcdp->tcd_port);
break;
case MQ:
tcdp->tcd_msg_avail = NULL;
tcdp->tcd_msg_object = NULL;
tcdp->tcd_msg_userval = NULL;
tcdp->tcd_msg_enabled = 0;
break;
}
lfree(tcdp, sizeof (*tcdp));
}
thread_communication_data_t *
setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
{
thread_communication_data_t *tcdp;
int error;
if (sigevp == NULL) {
errno = EINVAL;
return (NULL);
}
if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
errno = ENOMEM;
return (NULL);
}
if (sigevp->sigev_notify_attributes == NULL)
tcdp->tcd_attrp = NULL;
else {
tcdp->tcd_attrp = &tcdp->tcd_user_attr;
error = pthread_attr_clone(tcdp->tcd_attrp,
sigevp->sigev_notify_attributes);
if (error) {
tcdp->tcd_attrp = NULL;
free_sigev_handler(tcdp);
errno = error;
return (NULL);
}
}
tcdp->tcd_notif = *sigevp;
tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
if (caller == TIMER || caller == AIO) {
if ((tcdp->tcd_port = port_create()) < 0 ||
fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
free_sigev_handler(tcdp);
errno = EBADF;
return (NULL);
}
}
return (tcdp);
}
int
launch_spawner(thread_communication_data_t *tcdp)
{
int ret;
int maxworkers;
void *(*spawner)(void *);
sigset_t set;
sigset_t oset;
switch (tcdp->tcd_subsystem) {
case TIMER:
spawner = timer_spawner;
maxworkers = 1;
break;
case MQ:
spawner = mqueue_spawner;
maxworkers = 1;
break;
case AIO:
spawner = aio_spawner;
maxworkers = 100;
break;
default:
return (-1);
}
tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
tcdp->tcd_notif.sigev_notify_attributes);
if (tcdp->tcd_poolp == NULL)
return (-1);
(void) sigfillset(&set);
(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
ret = thr_create(NULL, 0, spawner, tcdp,
THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
if (ret != 0) {
tpool_destroy(tcdp->tcd_poolp);
tcdp->tcd_poolp = NULL;
return (-1);
}
return (0);
}
int
del_sigev_timer(timer_t timer)
{
int rc = 0;
thread_communication_data_t *tcdp;
if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
sig_mutex_lock(&tcdp->tcd_lock);
if (tcdp->tcd_port >= 0) {
if ((rc = port_alert(tcdp->tcd_port,
PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
_rt_dprintf("del_sigev_timer(%d) OK.\n", timer);
}
}
timer_tcd[timer] = NULL;
sig_mutex_unlock(&tcdp->tcd_lock);
}
return (rc);
}
int
sigev_timer_getoverrun(timer_t timer)
{
thread_communication_data_t *tcdp;
if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
return (tcdp->tcd_overruns);
return (0);
}
static void
del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
{
sig_mutex_unlock(&tcdp->tcd_lock);
free_sigev_handler(tcdp);
}
void
del_sigev_mq(thread_communication_data_t *tcdp)
{
pthread_t server_id;
int rc;
sig_mutex_lock(&tcdp->tcd_lock);
server_id = tcdp->tcd_server_id;
tcdp->tcd_msg_closing = 1;
if ((rc = pthread_cancel(server_id)) != 0) {
sig_mutex_unlock(&tcdp->tcd_lock);
_rt_dprintf("Fail to cancel %u with error %d <%s>.\n",
server_id, rc, strerror(rc));
return;
}
pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
while (tcdp->tcd_server_id == server_id)
(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
pthread_cleanup_pop(1);
}
int
_aio_sigev_thread_init(struct sigevent *sigevp)
{
static mutex_t sigev_aio_lock = DEFAULTMUTEX;
static cond_t sigev_aio_cv = DEFAULTCV;
static int sigev_aio_busy = 0;
thread_communication_data_t *tcdp;
int port;
int cancel_state;
int rc = 0;
if (sigevp == NULL ||
sigevp->sigev_notify != SIGEV_THREAD ||
sigevp->sigev_notify_function == NULL)
return (0);
lmutex_lock(&sigev_aio_lock);
(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
while (sigev_aio_busy)
(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
(void) pthread_setcancelstate(cancel_state, NULL);
if ((tcdp = sigev_aio_tcd) != NULL)
port = tcdp->tcd_port;
else {
sigev_aio_busy = 1;
lmutex_unlock(&sigev_aio_lock);
tcdp = setup_sigev_handler(sigevp, AIO);
if (tcdp == NULL) {
port = -1;
rc = -1;
} else if (launch_spawner(tcdp) != 0) {
free_sigev_handler(tcdp);
tcdp = NULL;
port = -1;
rc = -1;
} else {
port = tcdp->tcd_port;
}
lmutex_lock(&sigev_aio_lock);
sigev_aio_tcd = tcdp;
sigev_aio_busy = 0;
(void) cond_broadcast(&sigev_aio_cv);
}
lmutex_unlock(&sigev_aio_lock);
sigevp->sigev_signo = port;
return (rc);
}
int
_aio_sigev_thread(aiocb_t *aiocbp)
{
if (aiocbp == NULL)
return (0);
return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
}
#if !defined(_LP64)
int
_aio_sigev_thread64(aiocb64_t *aiocbp)
{
if (aiocbp == NULL)
return (0);
return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
}
#endif
void
postfork1_child_sigev_aio(void)
{
thread_communication_data_t *tcdp;
if ((tcdp = sigev_aio_tcd) != NULL) {
sigev_aio_tcd = NULL;
tcd_teardown(tcdp);
}
}
void
tcd_teardown(thread_communication_data_t *tcdp)
{
if (tcdp->tcd_poolp != NULL)
tpool_abandon(tcdp->tcd_poolp);
tcdp->tcd_poolp = NULL;
tcdp->tcd_server_id = 0;
free_sigev_handler(tcdp);
}