#include <signal.h>
#include <dirent.h>
#include <limits.h>
#include <alloca.h>
#include <unistd.h>
#include <stdio.h>
#include <fmd_string.h>
#include <fmd_alloc.h>
#include <fmd_module.h>
#include <fmd_error.h>
#include <fmd_conf.h>
#include <fmd_dispq.h>
#include <fmd_eventq.h>
#include <fmd_timerq.h>
#include <fmd_subr.h>
#include <fmd_thread.h>
#include <fmd_ustat.h>
#include <fmd_case.h>
#include <fmd_protocol.h>
#include <fmd_buf.h>
#include <fmd_ckpt.h>
#include <fmd_xprt.h>
#include <fmd_topo.h>
#include <fmd.h>
static const fmd_modstat_t _fmd_modstat_tmpl = {
{
{ "fmd.dispatched", FMD_TYPE_UINT64, "total events dispatched to module" },
{ "fmd.dequeued", FMD_TYPE_UINT64, "total events dequeued by module" },
{ "fmd.prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by module" },
{ "fmd.dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
{ "fmd.wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
{ "fmd.wtime", FMD_TYPE_TIME, "total wait time on queue" },
{ "fmd.wlentime", FMD_TYPE_TIME, "total wait length * time product" },
{ "fmd.wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
{ "fmd.dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
{ "fmd.dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
},
{ "fmd.loadtime", FMD_TYPE_TIME, "hrtime at which module was loaded" },
{ "fmd.snaptime", FMD_TYPE_TIME, "hrtime of last statistics snapshot" },
{ "fmd.accepted", FMD_TYPE_UINT64, "total events accepted by module" },
{ "fmd.debugdrop", FMD_TYPE_UINT64, "dropped debug messages" },
{ "fmd.memtotal", FMD_TYPE_SIZE, "total memory allocated by module" },
{ "fmd.memlimit", FMD_TYPE_SIZE, "limit on total memory allocated" },
{ "fmd.buftotal", FMD_TYPE_SIZE, "total buffer space used by module" },
{ "fmd.buflimit", FMD_TYPE_SIZE, "limit on total buffer space" },
{ "fmd.thrtotal", FMD_TYPE_UINT32, "total number of auxiliary threads" },
{ "fmd.thrlimit", FMD_TYPE_UINT32, "limit on number of auxiliary threads" },
{ "fmd.doorthrtotal", FMD_TYPE_UINT32, "total number of door server threads" },
{ "fmd.doorthrlimit", FMD_TYPE_UINT32, "limit on door server threads" },
{ "fmd.caseopen", FMD_TYPE_UINT64, "cases currently open by module" },
{ "fmd.casesolved", FMD_TYPE_UINT64, "total cases solved by module" },
{ "fmd.caseclosed", FMD_TYPE_UINT64, "total cases closed by module" },
{ "fmd.ckptsave", FMD_TYPE_BOOL, "save checkpoints for module" },
{ "fmd.ckptrestore", FMD_TYPE_BOOL, "restore checkpoints for module" },
{ "fmd.ckptzero", FMD_TYPE_BOOL, "zeroed checkpoint at startup" },
{ "fmd.ckptcnt", FMD_TYPE_UINT64, "number of checkpoints taken" },
{ "fmd.ckpttime", FMD_TYPE_TIME, "total checkpoint time" },
{ "fmd.xprtopen", FMD_TYPE_UINT32, "total number of open transports" },
{ "fmd.xprtlimit", FMD_TYPE_UINT32, "limit on number of open transports" },
{ "fmd.xprtqlimit", FMD_TYPE_UINT32, "limit on transport event queue length" },
};
static void
fmd_module_start(void *arg)
{
fmd_module_t *mp = arg;
fmd_event_t *ep;
fmd_xprt_t *xp;
(void) pthread_mutex_lock(&mp->mod_lock);
if (mp->mod_ops->mop_init(mp) != 0 || mp->mod_error != 0) {
if (mp->mod_error == 0)
mp->mod_error = errno ? errno : EFMD_MOD_INIT;
goto out;
}
if (fmd.d_mod_event != NULL)
fmd_eventq_insert_at_head(mp->mod_queue, fmd.d_mod_event);
ASSERT(MUTEX_HELD(&mp->mod_lock));
mp->mod_flags |= FMD_MOD_INIT;
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
for (xp = fmd_list_next(&mp->mod_transports);
xp != NULL; xp = fmd_list_next(xp))
fmd_xprt_xresume(xp, FMD_XPRT_ISUSPENDED);
while ((ep = fmd_eventq_delete(mp->mod_queue)) != NULL) {
if (mp->mod_error != 0) {
fmd_eventq_done(mp->mod_queue);
fmd_event_rele(ep);
continue;
}
mp->mod_ops->mop_dispatch(mp, ep);
fmd_eventq_done(mp->mod_queue);
fmd_module_lock(mp);
if (FMD_EVENT_TYPE(ep) == FMD_EVT_CLOSE)
fmd_case_delete(FMD_EVENT_DATA(ep));
fmd_ckpt_save(mp);
fmd_module_unlock(mp);
fmd_event_rele(ep);
}
if (mp->mod_ops->mop_fini(mp) != 0 && mp->mod_error == 0)
mp->mod_error = errno ? errno : EFMD_MOD_FINI;
(void) pthread_mutex_lock(&mp->mod_lock);
mp->mod_flags |= FMD_MOD_FINI;
out:
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
}
fmd_module_t *
fmd_module_create(const char *path, const fmd_modops_t *ops)
{
fmd_module_t *mp = fmd_zalloc(sizeof (fmd_module_t), FMD_SLEEP);
char buf[PATH_MAX], *p;
const char *dir;
uint32_t limit;
int err;
(void) strlcpy(buf, fmd_strbasename(path), sizeof (buf));
if ((p = strrchr(buf, '.')) != NULL && strcmp(p, ".so") == 0)
*p = '\0';
(void) pthread_mutex_init(&mp->mod_lock, NULL);
(void) pthread_cond_init(&mp->mod_cv, NULL);
(void) pthread_mutex_init(&mp->mod_stats_lock, NULL);
mp->mod_name = fmd_strdup(buf, FMD_SLEEP);
mp->mod_path = fmd_strdup(path, FMD_SLEEP);
mp->mod_ops = ops;
mp->mod_ustat = fmd_ustat_create();
(void) fmd_conf_getprop(fmd.d_conf, "ckpt.dir", &dir);
(void) snprintf(buf, sizeof (buf),
"%s/%s/%s", fmd.d_rootdir, dir, mp->mod_name);
mp->mod_ckpt = fmd_strdup(buf, FMD_SLEEP);
(void) fmd_conf_getprop(fmd.d_conf, "client.tmrlim", &limit);
mp->mod_timerids = fmd_idspace_create(mp->mod_name, 1, limit + 1);
mp->mod_threads = fmd_idspace_create(mp->mod_name, 0, INT_MAX);
fmd_buf_hash_create(&mp->mod_bufs);
fmd_serd_hash_create(&mp->mod_serds);
mp->mod_topo_current = fmd_topo_hold();
(void) pthread_mutex_lock(&fmd.d_mod_lock);
fmd_list_append(&fmd.d_mod_list, mp);
(void) pthread_mutex_unlock(&fmd.d_mod_lock);
if ((mp->mod_stats = (fmd_modstat_t *)fmd_ustat_insert(mp->mod_ustat,
FMD_USTAT_ALLOC, sizeof (_fmd_modstat_tmpl) / sizeof (fmd_stat_t),
(fmd_stat_t *)&_fmd_modstat_tmpl, NULL)) == NULL) {
fmd_error(EFMD_MOD_INIT, "failed to initialize per-mod stats");
fmd_module_destroy(mp);
return (NULL);
}
if (nv_alloc_init(&mp->mod_nva_sleep,
&fmd_module_nva_ops_sleep, mp) != 0 ||
nv_alloc_init(&mp->mod_nva_nosleep,
&fmd_module_nva_ops_nosleep, mp) != 0) {
fmd_error(EFMD_MOD_INIT, "failed to initialize nvlist "
"allocation routines");
fmd_module_destroy(mp);
return (NULL);
}
(void) fmd_conf_getprop(fmd.d_conf, "client.evqlim", &limit);
mp->mod_queue = fmd_eventq_create(mp,
&mp->mod_stats->ms_evqstat, &mp->mod_stats_lock, limit);
(void) fmd_conf_getprop(fmd.d_conf, "client.memlim",
&mp->mod_stats->ms_memlimit.fmds_value.ui64);
(void) fmd_conf_getprop(fmd.d_conf, "client.buflim",
&mp->mod_stats->ms_buflimit.fmds_value.ui64);
(void) fmd_conf_getprop(fmd.d_conf, "client.thrlim",
&mp->mod_stats->ms_thrlimit.fmds_value.ui32);
(void) fmd_conf_getprop(fmd.d_conf, "client.doorthrlim",
&mp->mod_stats->ms_doorthrlimit.fmds_value.ui32);
(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlim",
&mp->mod_stats->ms_xprtlimit.fmds_value.ui32);
(void) fmd_conf_getprop(fmd.d_conf, "client.xprtqlim",
&mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
(void) fmd_conf_getprop(fmd.d_conf, "ckpt.save",
&mp->mod_stats->ms_ckpt_save.fmds_value.bool);
(void) fmd_conf_getprop(fmd.d_conf, "ckpt.restore",
&mp->mod_stats->ms_ckpt_restore.fmds_value.bool);
(void) fmd_conf_getprop(fmd.d_conf, "ckpt.zero",
&mp->mod_stats->ms_ckpt_zeroed.fmds_value.bool);
if (mp->mod_stats->ms_ckpt_zeroed.fmds_value.bool)
fmd_ckpt_delete(mp);
fmd_module_hold(mp);
(void) pthread_mutex_lock(&mp->mod_lock);
mp->mod_stats->ms_loadtime.fmds_value.ui64 = gethrtime();
mp->mod_thread = fmd_thread_create(mp, fmd_module_start, mp);
if (mp->mod_thread == NULL) {
fmd_error(EFMD_MOD_THR, "failed to create thread for %s", path);
(void) pthread_mutex_unlock(&mp->mod_lock);
fmd_module_rele(mp);
return (NULL);
}
while (!(mp->mod_flags & FMD_MOD_INIT) && mp->mod_error == 0)
(void) pthread_cond_wait(&mp->mod_cv, &mp->mod_lock);
if (!(mp->mod_flags & FMD_MOD_INIT)) {
err = mp->mod_error;
(void) pthread_mutex_unlock(&mp->mod_lock);
if (err == EFMD_CKPT_INVAL)
fmd_ckpt_rename(mp);
if (fmd.d_fg || err != EFMD_HDL_INIT) {
fmd_error(EFMD_MOD_INIT, "failed to load %s: %s\n",
path, fmd_strerror(err));
}
fmd_module_unload(mp);
fmd_module_rele(mp);
(void) fmd_set_errno(err);
return (NULL);
}
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
fmd_dprintf(FMD_DBG_MOD, "loaded module %s\n", mp->mod_name);
return (mp);
}
static void
fmd_module_untimeout(fmd_idspace_t *ids, id_t id, fmd_module_t *mp)
{
void *arg = fmd_timerq_remove(fmd.d_timers, ids, id);
if (arg != NULL && mp != fmd.d_rmod)
fmd_free(arg, sizeof (fmd_modtimer_t));
}
void
fmd_module_unload(fmd_module_t *mp)
{
fmd_modtopo_t *mtp;
(void) pthread_mutex_lock(&mp->mod_lock);
if (mp->mod_flags & FMD_MOD_QUIT) {
(void) pthread_mutex_unlock(&mp->mod_lock);
return;
}
ASSERT(mp->mod_thread != NULL);
mp->mod_flags |= FMD_MOD_QUIT;
if (mp->mod_queue != NULL)
fmd_eventq_abort(mp->mod_queue);
while ((mp->mod_flags & (FMD_MOD_INIT | FMD_MOD_FINI)) == FMD_MOD_INIT)
(void) pthread_cond_wait(&mp->mod_cv, &mp->mod_lock);
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
fmd_thread_destroy(mp->mod_thread, FMD_THREAD_JOIN);
mp->mod_thread = NULL;
fmd_module_lock(mp);
if (mp->mod_timerids != NULL) {
fmd_idspace_apply(mp->mod_timerids,
(void (*)())fmd_module_untimeout, mp);
fmd_idspace_destroy(mp->mod_timerids);
mp->mod_timerids = NULL;
}
if (mp->mod_threads != NULL) {
fmd_idspace_destroy(mp->mod_threads);
mp->mod_threads = NULL;
}
(void) fmd_buf_hash_destroy(&mp->mod_bufs);
fmd_serd_hash_destroy(&mp->mod_serds);
while ((mtp = fmd_list_next(&mp->mod_topolist)) != NULL) {
fmd_list_delete(&mp->mod_topolist, mtp);
fmd_topo_rele(mtp->mt_topo);
fmd_free(mtp, sizeof (fmd_modtopo_t));
}
fmd_module_unlock(mp);
fmd_dprintf(FMD_DBG_MOD, "unloaded module %s\n", mp->mod_name);
}
void
fmd_module_destroy(fmd_module_t *mp)
{
fmd_conf_formal_t *cfp = mp->mod_argv;
int i;
ASSERT(MUTEX_HELD(&mp->mod_lock));
if (mp->mod_thread != NULL) {
(void) pthread_mutex_unlock(&mp->mod_lock);
fmd_module_unload(mp);
(void) pthread_mutex_lock(&mp->mod_lock);
}
ASSERT(mp->mod_thread == NULL);
ASSERT(mp->mod_refs == 0);
(void) pthread_mutex_lock(&fmd.d_mod_lock);
fmd_list_delete(&fmd.d_mod_list, mp);
(void) pthread_mutex_unlock(&fmd.d_mod_lock);
if (mp->mod_topo_current != NULL)
fmd_topo_rele(mp->mod_topo_current);
if (mp->mod_nva_sleep.nva_ops != NULL)
nv_alloc_fini(&mp->mod_nva_sleep);
if (mp->mod_nva_nosleep.nva_ops != NULL)
nv_alloc_fini(&mp->mod_nva_nosleep);
if (mp->mod_queue != NULL) {
fmd_eventq_destroy(mp->mod_queue);
mp->mod_queue = NULL;
}
if (mp->mod_ustat != NULL) {
(void) pthread_mutex_lock(&mp->mod_stats_lock);
fmd_ustat_destroy(mp->mod_ustat);
mp->mod_ustat = NULL;
mp->mod_stats = NULL;
(void) pthread_mutex_unlock(&mp->mod_stats_lock);
}
for (i = 0; i < mp->mod_dictc; i++)
fm_dc_closedict(mp->mod_dictv[i]);
fmd_free(mp->mod_dictv, sizeof (struct fm_dc_handle *) * mp->mod_dictc);
if (mp->mod_conf != NULL)
fmd_conf_close(mp->mod_conf);
for (i = 0; i < mp->mod_argc; i++, cfp++) {
fmd_strfree((char *)cfp->cf_name);
fmd_strfree((char *)cfp->cf_default);
}
fmd_free(mp->mod_argv, sizeof (fmd_conf_formal_t) * mp->mod_argc);
fmd_strfree(mp->mod_name);
fmd_strfree(mp->mod_path);
fmd_strfree(mp->mod_ckpt);
nvlist_free(mp->mod_fmri);
fmd_strfree(mp->mod_vers);
fmd_free(mp, sizeof (fmd_module_t));
}
static void
fmd_module_error(fmd_module_t *mp, int err)
{
fmd_event_t *e;
nvlist_t *nvl;
char *class;
ASSERT(MUTEX_HELD(&mp->mod_lock));
ASSERT(err != 0);
TRACE((FMD_DBG_MOD, "module aborted: err=%d", err));
if (mp->mod_error == 0)
mp->mod_error = err;
if (mp == fmd.d_self)
return;
nvl = fmd_protocol_moderror(mp, EFMD_MOD_FAIL, fmd_strerror(err));
(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
fmd_dispq_dispatch(fmd.d_disp, e, class);
}
void
fmd_module_dispatch(fmd_module_t *mp, fmd_event_t *e)
{
const fmd_hdl_ops_t *ops = mp->mod_info->fmdi_ops;
fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
fmd_hdl_t *hdl = (fmd_hdl_t *)mp;
fmd_modtimer_t *t;
fmd_topo_t *old_topo;
volatile int err;
(void) pthread_mutex_lock(&mp->mod_lock);
ASSERT(!(mp->mod_flags & FMD_MOD_BUSY));
mp->mod_flags |= FMD_MOD_BUSY;
if ((err = setjmp(mp->mod_jmpbuf)) != 0) {
(void) pthread_mutex_lock(&mp->mod_lock);
fmd_module_error(mp, err);
}
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
if (err == 0) {
switch (ep->ev_type) {
case FMD_EVT_PROTOCOL:
ops->fmdo_recv(hdl, e, ep->ev_nvl, ep->ev_data);
break;
case FMD_EVT_TIMEOUT:
t = ep->ev_data;
ASSERT(t->mt_mod == mp);
ops->fmdo_timeout(hdl, t->mt_id, t->mt_arg);
break;
case FMD_EVT_CLOSE:
ops->fmdo_close(hdl, ep->ev_data);
break;
case FMD_EVT_STATS:
ops->fmdo_stats(hdl);
fmd_modstat_publish(mp);
break;
case FMD_EVT_GC:
ops->fmdo_gc(hdl);
break;
case FMD_EVT_PUBLISH:
fmd_case_publish(ep->ev_data, FMD_CASE_CURRENT);
break;
case FMD_EVT_TOPO:
old_topo = mp->mod_topo_current;
fmd_module_lock(mp);
mp->mod_topo_current = (fmd_topo_t *)ep->ev_data;
fmd_topo_addref(mp->mod_topo_current);
fmd_module_unlock(mp);
fmd_topo_rele(old_topo);
ops->fmdo_topo(hdl, mp->mod_topo_current->ft_hdl);
break;
}
}
fmd_module_exit(mp);
}
int
fmd_module_transport(fmd_module_t *mp, fmd_xprt_t *xp, fmd_event_t *e)
{
fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
fmd_hdl_t *hdl = (fmd_hdl_t *)mp;
ASSERT(ep->ev_type == FMD_EVT_PROTOCOL);
return (mp->mod_info->fmdi_ops->fmdo_send(hdl, xp, e, ep->ev_nvl));
}
void
fmd_module_timeout(fmd_modtimer_t *t, id_t id, hrtime_t hrt)
{
fmd_event_t *e;
t->mt_id = id;
e = fmd_event_create(FMD_EVT_TIMEOUT, hrt, NULL, t);
fmd_eventq_insert_at_time(t->mt_mod->mod_queue, e);
}
void
fmd_module_gc(fmd_module_t *mp)
{
fmd_hdl_info_t *info;
fmd_event_t *e;
if (mp->mod_error != 0)
return;
fmd_module_lock(mp);
if ((info = mp->mod_info) != NULL) {
fmd_serd_hash_apply(&mp->mod_serds, fmd_serd_eng_gc, NULL);
}
fmd_module_unlock(mp);
if (info != NULL) {
e = fmd_event_create(FMD_EVT_GC, FMD_HRT_NOW, NULL, NULL);
fmd_eventq_insert_at_head(mp->mod_queue, e);
}
}
void
fmd_module_trygc(fmd_module_t *mp)
{
if (fmd_module_trylock(mp)) {
fmd_serd_hash_apply(&mp->mod_serds, fmd_serd_eng_gc, NULL);
fmd_module_unlock(mp);
}
}
int
fmd_module_contains(fmd_module_t *mp, fmd_event_t *ep)
{
fmd_case_t *cp;
int rv = 0;
fmd_module_lock(mp);
for (cp = fmd_list_next(&mp->mod_cases);
cp != NULL; cp = fmd_list_next(cp)) {
if ((rv = fmd_case_contains(cp, ep)) != 0)
break;
}
if (rv == 0)
rv = fmd_serd_hash_contains(&mp->mod_serds, ep);
fmd_module_unlock(mp);
return (rv);
}
void
fmd_module_setdirty(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
mp->mod_flags |= FMD_MOD_MDIRTY;
(void) pthread_mutex_unlock(&mp->mod_lock);
}
void
fmd_module_setcdirty(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
mp->mod_flags |= FMD_MOD_CDIRTY;
(void) pthread_mutex_unlock(&mp->mod_lock);
}
void
fmd_module_clrdirty(fmd_module_t *mp)
{
fmd_case_t *cp;
fmd_module_lock(mp);
if (mp->mod_flags & FMD_MOD_CDIRTY) {
for (cp = fmd_list_next(&mp->mod_cases);
cp != NULL; cp = fmd_list_next(cp))
fmd_case_clrdirty(cp);
}
if (mp->mod_flags & FMD_MOD_MDIRTY) {
fmd_serd_hash_apply(&mp->mod_serds,
fmd_serd_eng_clrdirty, NULL);
fmd_buf_hash_commit(&mp->mod_bufs);
}
(void) pthread_mutex_lock(&mp->mod_lock);
mp->mod_flags &= ~(FMD_MOD_MDIRTY | FMD_MOD_CDIRTY);
(void) pthread_mutex_unlock(&mp->mod_lock);
fmd_module_unlock(mp);
}
void
fmd_module_commit(fmd_module_t *mp)
{
fmd_case_t *cp;
ASSERT(fmd_module_locked(mp));
if (mp->mod_flags & FMD_MOD_CDIRTY) {
for (cp = fmd_list_next(&mp->mod_cases);
cp != NULL; cp = fmd_list_next(cp))
fmd_case_commit(cp);
}
if (mp->mod_flags & FMD_MOD_MDIRTY) {
fmd_serd_hash_apply(&mp->mod_serds, fmd_serd_eng_commit, NULL);
fmd_buf_hash_commit(&mp->mod_bufs);
}
(void) pthread_mutex_lock(&mp->mod_lock);
mp->mod_flags &= ~(FMD_MOD_MDIRTY | FMD_MOD_CDIRTY);
(void) pthread_mutex_unlock(&mp->mod_lock);
mp->mod_gen++;
}
void
fmd_module_lock(fmd_module_t *mp)
{
pthread_t self = pthread_self();
(void) pthread_mutex_lock(&mp->mod_lock);
while (mp->mod_flags & FMD_MOD_LOCK) {
if (mp->mod_owner != self)
(void) pthread_cond_wait(&mp->mod_cv, &mp->mod_lock);
else
fmd_panic("recursive module lock of %p\n", (void *)mp);
}
mp->mod_owner = self;
mp->mod_flags |= FMD_MOD_LOCK;
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
}
void
fmd_module_unlock(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
ASSERT(mp->mod_owner == pthread_self());
ASSERT(mp->mod_flags & FMD_MOD_LOCK);
mp->mod_owner = 0;
mp->mod_flags &= ~FMD_MOD_LOCK;
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
}
int
fmd_module_trylock(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
if (mp->mod_flags & FMD_MOD_LOCK) {
(void) pthread_mutex_unlock(&mp->mod_lock);
return (0);
}
mp->mod_owner = pthread_self();
mp->mod_flags |= FMD_MOD_LOCK;
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
return (1);
}
int
fmd_module_locked(fmd_module_t *mp)
{
return ((mp->mod_flags & FMD_MOD_LOCK) &&
mp->mod_owner == pthread_self());
}
int
fmd_module_enter(fmd_module_t *mp, void (*func)(fmd_hdl_t *))
{
volatile int err;
(void) pthread_mutex_lock(&mp->mod_lock);
ASSERT(!(mp->mod_flags & FMD_MOD_BUSY));
mp->mod_flags |= FMD_MOD_BUSY;
if ((err = setjmp(mp->mod_jmpbuf)) != 0) {
(void) pthread_mutex_lock(&mp->mod_lock);
fmd_module_error(mp, err);
}
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
if (err == 0 && func != NULL)
(*func)((fmd_hdl_t *)mp);
return (err);
}
void
fmd_module_exit(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
ASSERT(mp->mod_flags & FMD_MOD_BUSY);
mp->mod_flags &= ~FMD_MOD_BUSY;
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
}
void
fmd_module_abort(fmd_module_t *mp, int err)
{
uint_t policy = FMD_CERROR_UNLOAD;
pthread_t tid = pthread_self();
(void) fmd_conf_getprop(fmd.d_conf, "client.error", &policy);
if (policy == FMD_CERROR_STOP) {
fmd_error(err, "stopping after %s in client %s (%p)\n",
fmd_errclass(err), mp->mod_name, (void *)mp);
(void) raise(SIGSTOP);
} else if (policy == FMD_CERROR_ABORT) {
fmd_panic("aborting due to %s in client %s (%p)\n",
fmd_errclass(err), mp->mod_name, (void *)mp);
}
if (tid != mp->mod_thread->thr_tid) {
(void) pthread_mutex_lock(&mp->mod_lock);
if (mp->mod_error == 0)
mp->mod_error = err;
mp->mod_flags |= FMD_MOD_FAIL;
(void) pthread_mutex_unlock(&mp->mod_lock);
(void) pthread_cancel(tid);
pthread_exit(NULL);
}
ASSERT(mp->mod_flags & FMD_MOD_BUSY);
longjmp(mp->mod_jmpbuf, err);
}
void
fmd_module_hold(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
TRACE((FMD_DBG_MOD, "hold %p (%s/%u)\n",
(void *)mp, mp->mod_name, mp->mod_refs));
mp->mod_refs++;
ASSERT(mp->mod_refs != 0);
(void) pthread_mutex_unlock(&mp->mod_lock);
}
void
fmd_module_rele(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
TRACE((FMD_DBG_MOD, "rele %p (%s/%u)\n",
(void *)mp, mp->mod_name, mp->mod_refs));
ASSERT(mp->mod_refs != 0);
if (--mp->mod_refs == 0)
fmd_module_destroy(mp);
else
(void) pthread_mutex_unlock(&mp->mod_lock);
}
int
fmd_module_dc_opendict(fmd_module_t *mp, const char *dict)
{
struct fm_dc_handle *dcp, **dcv;
char *dictdir, *dictnam, *p;
size_t len;
ASSERT(fmd_module_locked(mp));
dictnam = strdupa(fmd_strbasename(dict));
if ((p = strrchr(dictnam, '.')) != NULL &&
strcmp(p, ".dict") == 0)
*p = '\0';
if (dict[0] == '/') {
len = strlen(fmd.d_rootdir) + strlen(dict) + 1;
dictdir = alloca(len);
(void) snprintf(dictdir, len, "%s%s", fmd.d_rootdir, dict);
(void) fmd_strdirname(dictdir);
} else {
(void) fmd_conf_getprop(fmd.d_conf, "dictdir", &p);
len = strlen(fmd.d_rootdir) + strlen(p) + strlen(dict) + 3;
dictdir = alloca(len);
(void) snprintf(dictdir, len,
"%s/%s/%s", fmd.d_rootdir, p, dict);
(void) fmd_strdirname(dictdir);
}
fmd_dprintf(FMD_DBG_MOD, "module %s opening %s -> %s/%s.dict\n",
mp->mod_name, dict, dictdir, dictnam);
if ((dcp = fm_dc_opendict(FM_DC_VERSION, dictdir, dictnam)) == NULL)
return (-1);
dcv = fmd_alloc(sizeof (dcp) * (mp->mod_dictc + 1), FMD_SLEEP);
bcopy(mp->mod_dictv, dcv, sizeof (dcp) * mp->mod_dictc);
fmd_free(mp->mod_dictv, sizeof (dcp) * mp->mod_dictc);
mp->mod_dictv = dcv;
mp->mod_dictv[mp->mod_dictc++] = dcp;
len = fm_dc_codelen(dcp);
mp->mod_codelen = MAX(mp->mod_codelen, len);
return (0);
}
int
fmd_module_dc_key2code(fmd_module_t *mp,
char *const keys[], char *code, size_t codelen)
{
int i, err;
for (i = 0; i < mp->mod_dictc; i++) {
if ((err = fm_dc_key2code(mp->mod_dictv[i], (const char **)keys,
code, codelen)) == 0 || errno != ENOMSG)
return (err);
}
return (fmd_set_errno(ENOMSG));
}
fmd_modhash_t *
fmd_modhash_create(void)
{
fmd_modhash_t *mhp = fmd_alloc(sizeof (fmd_modhash_t), FMD_SLEEP);
(void) pthread_rwlock_init(&mhp->mh_lock, NULL);
mhp->mh_hashlen = fmd.d_str_buckets;
mhp->mh_hash = fmd_zalloc(sizeof (void *) * mhp->mh_hashlen, FMD_SLEEP);
mhp->mh_nelems = 0;
return (mhp);
}
void
fmd_modhash_destroy(fmd_modhash_t *mhp)
{
fmd_module_t *mp, *nmp;
uint_t i;
for (i = 0; i < mhp->mh_hashlen; i++) {
for (mp = mhp->mh_hash[i]; mp != NULL; mp = nmp) {
nmp = mp->mod_next;
mp->mod_next = NULL;
fmd_module_rele(mp);
}
}
fmd_free(mhp->mh_hash, sizeof (void *) * mhp->mh_hashlen);
(void) pthread_rwlock_destroy(&mhp->mh_lock);
fmd_free(mhp, sizeof (fmd_modhash_t));
}
static void
fmd_modhash_loaddir(fmd_modhash_t *mhp, const char *dir,
const fmd_modops_t *ops, const char *suffix)
{
char path[PATH_MAX];
struct dirent *dp;
const char *p;
DIR *dirp;
if ((dirp = opendir(dir)) == NULL)
return;
while ((dp = readdir(dirp)) != NULL) {
if (dp->d_name[0] == '.')
continue;
p = strrchr(dp->d_name, '.');
if (p != NULL && strcmp(p, ".conf") == 0)
continue;
if (suffix != NULL && (p == NULL || strcmp(p, suffix) != 0))
continue;
(void) snprintf(path, sizeof (path), "%s/%s", dir, dp->d_name);
(void) fmd_modhash_load(mhp, path, ops);
}
(void) closedir(dirp);
}
void
fmd_modhash_loadall(fmd_modhash_t *mhp, const fmd_conf_path_t *pap,
const fmd_modops_t *ops, const char *suffix)
{
int i;
for (i = 0; i < pap->cpa_argc; i++)
fmd_modhash_loaddir(mhp, pap->cpa_argv[i], ops, suffix);
}
void
fmd_modhash_apply(fmd_modhash_t *mhp, void (*func)(fmd_module_t *))
{
fmd_module_t *mp, *np;
uint_t i;
(void) pthread_rwlock_rdlock(&mhp->mh_lock);
for (i = 0; i < mhp->mh_hashlen; i++) {
for (mp = mhp->mh_hash[i]; mp != NULL; mp = np) {
np = mp->mod_next;
func(mp);
}
}
(void) pthread_rwlock_unlock(&mhp->mh_lock);
}
void
fmd_modhash_tryapply(fmd_modhash_t *mhp, void (*func)(fmd_module_t *))
{
fmd_module_t *mp, *np;
uint_t i;
if (mhp == NULL || pthread_rwlock_tryrdlock(&mhp->mh_lock) != 0)
return;
for (i = 0; i < mhp->mh_hashlen; i++) {
for (mp = mhp->mh_hash[i]; mp != NULL; mp = np) {
np = mp->mod_next;
func(mp);
}
}
(void) pthread_rwlock_unlock(&mhp->mh_lock);
}
void
fmd_modhash_dispatch(fmd_modhash_t *mhp, fmd_event_t *ep)
{
fmd_module_t *mp;
uint_t i;
fmd_event_hold(ep);
(void) pthread_rwlock_rdlock(&mhp->mh_lock);
for (i = 0; i < mhp->mh_hashlen; i++) {
for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
(void) pthread_mutex_lock(&mp->mod_lock);
if ((mp->mod_flags & (FMD_MOD_INIT | FMD_MOD_FINI |
FMD_MOD_QUIT)) == FMD_MOD_INIT && !mp->mod_error) {
if (FMD_EVENT_TYPE(ep) == FMD_EVT_TOPO)
fmd_eventq_drop_topo(mp->mod_queue);
fmd_eventq_insert_at_time(mp->mod_queue, ep);
}
(void) pthread_mutex_unlock(&mp->mod_lock);
}
}
(void) pthread_rwlock_unlock(&mhp->mh_lock);
fmd_event_rele(ep);
}
fmd_module_t *
fmd_modhash_lookup(fmd_modhash_t *mhp, const char *name)
{
fmd_module_t *mp;
uint_t h;
(void) pthread_rwlock_rdlock(&mhp->mh_lock);
h = fmd_strhash(name) % mhp->mh_hashlen;
for (mp = mhp->mh_hash[h]; mp != NULL; mp = mp->mod_next) {
if (strcmp(name, mp->mod_name) == 0)
break;
}
if (mp != NULL)
fmd_module_hold(mp);
else
(void) fmd_set_errno(EFMD_MOD_NOMOD);
(void) pthread_rwlock_unlock(&mhp->mh_lock);
return (mp);
}
fmd_module_t *
fmd_modhash_load(fmd_modhash_t *mhp, const char *path, const fmd_modops_t *ops)
{
char name[PATH_MAX], *p;
fmd_module_t *mp;
int tries = 0;
uint_t h;
(void) strlcpy(name, fmd_strbasename(path), sizeof (name));
if ((p = strrchr(name, '.')) != NULL && strcmp(p, ".so") == 0)
*p = '\0';
(void) pthread_rwlock_wrlock(&mhp->mh_lock);
h = fmd_strhash(name) % mhp->mh_hashlen;
for (mp = mhp->mh_hash[h]; mp != NULL; mp = mp->mod_next) {
if (strcmp(name, mp->mod_name) == 0)
break;
}
if (mp != NULL) {
(void) pthread_rwlock_unlock(&mhp->mh_lock);
(void) fmd_set_errno(EFMD_MOD_LOADED);
return (NULL);
}
while ((mp = fmd_module_create(path, ops)) == NULL) {
if (tries++ != 0 || errno != EFMD_CKPT_INVAL) {
(void) pthread_rwlock_unlock(&mhp->mh_lock);
return (NULL);
}
}
mp->mod_hash = mhp;
mp->mod_next = mhp->mh_hash[h];
mhp->mh_hash[h] = mp;
mhp->mh_nelems++;
(void) pthread_rwlock_unlock(&mhp->mh_lock);
return (mp);
}
int
fmd_modhash_unload(fmd_modhash_t *mhp, const char *name)
{
fmd_module_t *mp, **pp;
uint_t h;
(void) pthread_rwlock_wrlock(&mhp->mh_lock);
h = fmd_strhash(name) % mhp->mh_hashlen;
pp = &mhp->mh_hash[h];
for (mp = *pp; mp != NULL; mp = mp->mod_next) {
if (strcmp(name, mp->mod_name) == 0)
break;
else
pp = &mp->mod_next;
}
if (mp == NULL) {
(void) pthread_rwlock_unlock(&mhp->mh_lock);
return (fmd_set_errno(EFMD_MOD_NOMOD));
}
*pp = mp->mod_next;
mp->mod_next = NULL;
ASSERT(mhp->mh_nelems != 0);
mhp->mh_nelems--;
(void) pthread_rwlock_unlock(&mhp->mh_lock);
fmd_module_unload(mp);
fmd_module_rele(mp);
return (0);
}
void
fmd_modstat_publish(fmd_module_t *mp)
{
(void) pthread_mutex_lock(&mp->mod_lock);
ASSERT(mp->mod_flags & FMD_MOD_STSUB);
mp->mod_flags |= FMD_MOD_STPUB;
(void) pthread_cond_broadcast(&mp->mod_cv);
while (mp->mod_flags & FMD_MOD_STPUB)
(void) pthread_cond_wait(&mp->mod_cv, &mp->mod_lock);
(void) pthread_mutex_unlock(&mp->mod_lock);
}
int
fmd_modstat_snapshot(fmd_module_t *mp, fmd_ustat_snap_t *uss)
{
fmd_event_t *e;
int err;
(void) pthread_mutex_lock(&mp->mod_lock);
while (mp->mod_error == 0 && (mp->mod_flags & FMD_MOD_STSUB))
(void) pthread_cond_wait(&mp->mod_cv, &mp->mod_lock);
if (mp->mod_error != 0) {
(void) pthread_mutex_unlock(&mp->mod_lock);
return (fmd_set_errno(EFMD_HDL_ABORT));
}
mp->mod_flags |= FMD_MOD_STSUB;
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
e = fmd_event_create(FMD_EVT_STATS, FMD_HRT_NOW, NULL, NULL);
fmd_eventq_insert_at_head(mp->mod_queue, e);
(void) pthread_mutex_lock(&mp->mod_lock);
while (mp->mod_error == 0 && !(mp->mod_flags & FMD_MOD_STPUB)) {
struct timespec tms;
(void) pthread_cond_wait(&mp->mod_cv, &mp->mod_lock);
(void) pthread_mutex_unlock(&mp->mod_lock);
tms.tv_sec = 0;
tms.tv_nsec = 10000000;
(void) nanosleep(&tms, NULL);
(void) pthread_mutex_lock(&mp->mod_lock);
}
if (mp->mod_error != 0) {
(void) pthread_mutex_unlock(&mp->mod_lock);
return (fmd_set_errno(EFMD_HDL_ABORT));
}
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
(void) pthread_mutex_lock(&mp->mod_stats_lock);
if (mp->mod_stats != NULL) {
mp->mod_stats->ms_snaptime.fmds_value.ui64 = gethrtime();
err = fmd_ustat_snapshot(mp->mod_ustat, uss);
} else
err = fmd_set_errno(EFMD_HDL_ABORT);
(void) pthread_mutex_unlock(&mp->mod_stats_lock);
(void) pthread_mutex_lock(&mp->mod_lock);
ASSERT(mp->mod_flags & FMD_MOD_STSUB);
ASSERT(mp->mod_flags & FMD_MOD_STPUB);
mp->mod_flags &= ~(FMD_MOD_STSUB | FMD_MOD_STPUB);
(void) pthread_cond_broadcast(&mp->mod_cv);
(void) pthread_mutex_unlock(&mp->mod_lock);
return (err);
}
struct topo_hdl *
fmd_module_topo_hold(fmd_module_t *mp)
{
fmd_modtopo_t *mtp;
ASSERT(fmd_module_locked(mp));
mtp = fmd_zalloc(sizeof (fmd_modtopo_t), FMD_SLEEP);
mtp->mt_topo = mp->mod_topo_current;
fmd_topo_addref(mtp->mt_topo);
fmd_list_prepend(&mp->mod_topolist, mtp);
return (mtp->mt_topo->ft_hdl);
}
int
fmd_module_topo_rele(fmd_module_t *mp, struct topo_hdl *hdl)
{
fmd_modtopo_t *mtp;
ASSERT(fmd_module_locked(mp));
for (mtp = fmd_list_next(&mp->mod_topolist); mtp != NULL;
mtp = fmd_list_next(mtp)) {
if (mtp->mt_topo->ft_hdl == hdl)
break;
}
if (mtp == NULL)
return (-1);
fmd_list_delete(&mp->mod_topolist, mtp);
fmd_topo_rele(mtp->mt_topo);
fmd_free(mtp, sizeof (fmd_modtopo_t));
return (0);
}