#include "mt.h"
#include "rpc_mt.h"
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <rpc/rpc.h>
#include <errno.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <syslog.h>
#include <thread.h>
#include <assert.h>
#include <libintl.h>
#include <values.h>
extern const char __nsl_dom[];
extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
extern bool_t __is_a_userfd(int P_fd);
extern void __destroy_userfd();
extern void clear_pollfd(int);
extern void set_pollfd(int , short );
extern void svc_getreq_poll();
extern void (*__proc_cleanup_cb)();
static void start_threads();
static void create_pipe();
static void clear_pipe();
static int select_next_pollfd();
static SVCXPRT *make_xprt_copy();
static void _svc_run_mt();
static void _svc_run();
int _svc_prog_dispatch();
static void _svc_done_private();
extern rwlock_t svc_fd_lock;
extern mutex_t svc_door_mutex;
extern cond_t svc_door_waitcv;
extern int svc_ndoorfds;
extern void __svc_cleanup_door_xprts();
extern void __svc_free_xprtlist();
extern void __svc_getreq_user(struct pollfd *);
int __rpc_connmaxrec = 0;
int __rpc_irtimeout = 35;
bool_t __rpc_tp_exclbind = FALSE;
static int __svc_lstnbklog = 64;
int svc_mt_mode;
int svc_pipe[2];
static int svc_thr_max = 16;
static int svc_thr_total;
static int svc_thr_active;
#define CIRCULAR_BUFSIZE 1024
static int svc_pending_fds[CIRCULAR_BUFSIZE+1];
static int svc_next_pending;
static int svc_last_pending;
static int svc_total_pending;
static int svc_thr_total_creates;
static int svc_thr_total_create_errors;
static int svc_waiters;
int svc_nfds;
int svc_nfds_set;
int svc_max_fd = 0;
int svc_npollfds;
int svc_npollfds_set;
int svc_max_pollfd;
int svc_pollfd_allocd;
#define POLLSET_EXTEND 256
static int svc_pollset_allocd;
static struct pollfd *svc_pollset;
static int svc_polled;
static int svc_pollfds;
static int svc_next_pollfd;
bool_t svc_polling;
static bool_t svc_exit_done = TRUE;
void
svc_run(void)
{
svc_exit_done = FALSE;
while ((svc_npollfds > 0 || svc_ndoorfds > 0) && !svc_exit_done) {
if (svc_npollfds > 0) {
switch (svc_mt_mode) {
case RPC_SVC_MT_NONE:
_svc_run();
break;
default:
_svc_run_mt();
break;
}
continue;
}
(void) mutex_lock(&svc_door_mutex);
if (svc_ndoorfds > 0)
(void) cond_wait(&svc_door_waitcv, &svc_door_mutex);
(void) mutex_unlock(&svc_door_mutex);
}
}
void
svc_exit(void)
{
SVCXPRT *xprt;
int fd;
char dummy;
(void) mutex_lock(&svc_exit_mutex);
if (svc_exit_done) {
(void) mutex_unlock(&svc_exit_mutex);
return;
}
svc_exit_done = TRUE;
for (fd = 0; fd < svc_max_pollfd; fd++) {
xprt = svc_xports[fd];
if (xprt) {
SVC_DESTROY(xprt);
}
}
__svc_free_xprtlist();
__svc_cleanup_door_xprts();
(void) mutex_unlock(&svc_exit_mutex);
if (svc_mt_mode != RPC_SVC_MT_NONE) {
(void) mutex_lock(&svc_mutex);
(void) cond_broadcast(&svc_thr_fdwait);
(void) mutex_unlock(&svc_mutex);
(void) write(svc_pipe[1], &dummy, sizeof (dummy));
}
(void) mutex_lock(&svc_door_mutex);
(void) cond_signal(&svc_door_waitcv);
(void) mutex_unlock(&svc_door_mutex);
__destroy_userfd();
}
static int
alloc_pollset(int npollfds)
{
if (npollfds > svc_pollset_allocd) {
pollfd_t *tmp;
do {
svc_pollset_allocd += POLLSET_EXTEND;
} while (npollfds > svc_pollset_allocd);
tmp = realloc(svc_pollset,
sizeof (pollfd_t) * svc_pollset_allocd);
if (tmp == NULL) {
syslog(LOG_ERR, "alloc_pollset: out of memory");
return (-1);
}
svc_pollset = tmp;
}
return (0);
}
static void
_svc_run(void)
{
sigset_t set, oldset;
int npollfds;
int i;
(void) sigemptyset(&set);
(void) sigaddset(&set, SIGALRM);
(void) sigprocmask(SIG_BLOCK, &set, &oldset);
while (!svc_exit_done) {
(void) rw_rdlock(&svc_fd_lock);
if (alloc_pollset(svc_npollfds) == -1)
break;
npollfds = __rpc_compress_pollfd(svc_max_pollfd,
svc_pollfd, svc_pollset);
(void) rw_unlock(&svc_fd_lock);
if (npollfds == 0)
break;
(void) sigprocmask(SIG_SETMASK, &oldset, NULL);
i = poll(svc_pollset, npollfds, -1);
(void) sigprocmask(SIG_BLOCK, &set, &oldset);
switch (i) {
case -1:
case 0:
continue;
default:
svc_getreq_poll(svc_pollset, i);
}
}
(void) sigprocmask(SIG_SETMASK, &oldset, NULL);
}
enum {
INVALID_POLLFD = -200,
FD_FROM_PENDING
};
static void
_svc_run_mt(void)
{
int npollfds;
int n_polled, dispatch;
static bool_t first_time = TRUE;
bool_t main_thread = FALSE;
int n_new;
int myfd, mypollfd;
SVCXPRT *parent_xprt, *xprt;
if (first_time) {
first_time = FALSE;
main_thread = TRUE;
svc_thr_total = 1;
svc_next_pending = svc_last_pending = 0;
create_pipe();
}
if (svc_exit_done)
return;
for (;;) {
(void) mutex_lock(&svc_thr_mutex);
(void) mutex_lock(&svc_mutex);
continue_with_locks:
myfd = -1;
mypollfd = INVALID_POLLFD;
if (svc_total_pending > 0) {
myfd = svc_pending_fds[svc_next_pending++];
mypollfd = FD_FROM_PENDING;
if (svc_next_pending > CIRCULAR_BUFSIZE)
svc_next_pending = 0;
svc_total_pending--;
}
if (myfd == -1 && svc_pollfds == 0) {
svc_polling = TRUE;
(void) rw_rdlock(&svc_fd_lock);
if (svc_npollfds == 0 ||
alloc_pollset(svc_npollfds + 1) == -1) {
(void) rw_unlock(&svc_fd_lock);
svc_polling = FALSE;
svc_thr_total--;
(void) mutex_unlock(&svc_mutex);
(void) mutex_unlock(&svc_thr_mutex);
if (!main_thread) {
thr_exit(NULL);
}
break;
}
npollfds = __rpc_compress_pollfd(svc_max_pollfd,
svc_pollfd, svc_pollset);
(void) rw_unlock(&svc_fd_lock);
if (npollfds == 0) {
svc_polling = FALSE;
(void) mutex_unlock(&svc_thr_mutex);
if ((!main_thread) && svc_waiters > 0) {
svc_thr_total--;
(void) mutex_unlock(&svc_mutex);
thr_exit(NULL);
}
while (svc_npollfds_set == 0 &&
svc_pollfds == 0 &&
svc_total_pending == 0 &&
!svc_exit_done) {
svc_waiters++;
(void) cond_wait(&svc_thr_fdwait,
&svc_mutex);
svc_waiters--;
}
if (svc_exit_done) {
svc_thr_total--;
(void) mutex_unlock(&svc_mutex);
if (!main_thread)
thr_exit(NULL);
break;
}
(void) mutex_unlock(&svc_mutex);
continue;
}
svc_pollset[npollfds].fd = svc_pipe[0];
svc_pollset[npollfds].events = MASKVAL;
do {
int i, j;
(void) mutex_unlock(&svc_mutex);
n_polled = poll(svc_pollset, npollfds + 1, -1);
(void) mutex_lock(&svc_mutex);
if (n_polled <= 0)
continue;
for (i = 0; i <= npollfds; i++) {
if (svc_pollset[i].revents & POLLNVAL) {
for (j = i; j < npollfds; j++)
svc_pollset[j] =
svc_pollset[j + 1];
(void) memset(&svc_pollset[j],
0, sizeof (struct pollfd));
npollfds--;
n_polled--;
i--;
}
}
} while (n_polled <= 0);
svc_polling = FALSE;
if (svc_pollset[npollfds].revents) {
clear_pipe();
n_polled--;
svc_pollset[npollfds].revents = 0;
}
svc_polled = npollfds;
svc_pollfds = n_polled;
svc_next_pollfd = 0;
if (svc_exit_done) {
svc_thr_total--;
(void) mutex_unlock(&svc_mutex);
(void) mutex_unlock(&svc_thr_mutex);
if (!main_thread) {
thr_exit(NULL);
}
break;
}
if (svc_pollfds == 0)
goto continue_with_locks;
}
if (myfd == -1) {
if (select_next_pollfd(&myfd, &mypollfd) == -1)
goto continue_with_locks;
}
if (svc_thr_total < svc_thr_max &&
svc_mt_mode == RPC_SVC_MT_AUTO && !svc_exit_done) {
n_new = 1 + 1 + svc_pollfds + svc_total_pending -
(svc_thr_total - svc_thr_active);
if (n_new > (svc_thr_max - svc_thr_total))
n_new = svc_thr_max - svc_thr_total;
if (n_new > 0)
start_threads(n_new);
}
parent_xprt = svc_xports[myfd];
if (parent_xprt == NULL) {
if (__is_a_userfd(myfd) == TRUE)
__svc_getreq_user(&(svc_pollset[mypollfd]));
goto continue_with_locks;
}
if (svc_defunct(parent_xprt) || svc_failed(parent_xprt))
goto continue_with_locks;
if ((xprt = make_xprt_copy(parent_xprt)) == NULL)
goto continue_with_locks;
if (svc_mt_mode == RPC_SVC_MT_AUTO)
svc_thr_active++;
(void) mutex_unlock(&svc_mutex);
(void) mutex_unlock(&svc_thr_mutex);
{
struct rpc_msg *msg;
struct svc_req *r;
char *cred_area;
msg = SVCEXT(xprt)->msg;
r = SVCEXT(xprt)->req;
cred_area = SVCEXT(xprt)->cred_area;
msg->rm_call.cb_cred.oa_base = cred_area;
msg->rm_call.cb_verf.oa_base =
&(cred_area[MAX_AUTH_BYTES]);
r->rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
if ((dispatch = SVC_RECV(xprt, msg))) {
if (svc_mt_mode != RPC_SVC_MT_NONE)
svc_flags(xprt) |= SVC_ARGS_CHECK;
dispatch = _svc_prog_dispatch(xprt, msg, r);
if (__proc_cleanup_cb != NULL)
(*__proc_cleanup_cb)(xprt);
} else
svc_args_done(xprt);
if (svc_mt_mode == RPC_SVC_MT_AUTO || !dispatch) {
if (svc_flags(xprt) & SVC_ARGS_CHECK)
svc_args_done(xprt);
(void) mutex_lock(&svc_mutex);
_svc_done_private(xprt);
if (svc_mt_mode == RPC_SVC_MT_AUTO) {
svc_thr_active--;
if (!main_thread &&
svc_pollfds <= 0 &&
svc_total_pending <= 0 &&
(svc_polling ||
svc_waiters > 0)) {
svc_thr_total--;
if (svc_thr_total ==
svc_waiters) {
(void) cond_broadcast(
&svc_thr_fdwait);
}
(void) mutex_unlock(&svc_mutex);
thr_exit(NULL);
}
}
(void) mutex_unlock(&svc_mutex);
}
}
}
}
static void
start_threads(int num_threads)
{
int i;
assert(MUTEX_HELD(&svc_mutex));
for (i = 0; i < num_threads; i++) {
if (thr_create(NULL, 0, (void *(*)(void *))_svc_run_mt, NULL,
THR_DETACHED, NULL) == 0) {
svc_thr_total++;
svc_thr_total_creates++;
} else {
svc_thr_total_create_errors++;
}
}
}
static void
create_pipe(void)
{
if (pipe(svc_pipe) == -1) {
syslog(LOG_ERR, dgettext(__nsl_dom,
"RPC: svc could not create pipe - exiting"));
exit(1);
}
if (fcntl(svc_pipe[0], F_SETFL, O_NONBLOCK) == -1) {
syslog(LOG_ERR, dgettext(__nsl_dom,
"RPC: svc pipe error - exiting"));
exit(1);
}
if (fcntl(svc_pipe[1], F_SETFL, O_NONBLOCK) == -1) {
syslog(LOG_ERR, dgettext(__nsl_dom,
"RPC: svc pipe error - exiting"));
exit(1);
}
}
static void
clear_pipe(void)
{
char buf[16];
int i;
do {
i = read(svc_pipe[0], buf, sizeof (buf));
} while (i == sizeof (buf));
}
static int
select_next_pollfd(int *fd, int *pollfdIndex)
{
int i;
assert(MUTEX_HELD(&svc_thr_mutex));
assert(MUTEX_HELD(&svc_mutex));
for (i = svc_next_pollfd; svc_pollfds > 0 && i < svc_polled; i++) {
if (svc_pollset[i].revents) {
svc_pollfds--;
svc_next_pollfd = i + 1;
*fd = svc_pollset[i].fd;
*pollfdIndex = i;
return (0);
}
}
svc_next_pollfd = svc_pollfds = 0;
*fd = -1;
*pollfdIndex = INVALID_POLLFD;
return (-1);
}
static SVCXPRT *
make_xprt_copy(SVCXPRT *parent)
{
SVCXPRT_LIST *xlist = SVCEXT(parent)->my_xlist;
SVCXPRT_LIST *xret;
SVCXPRT *xprt;
int fd = parent->xp_fd;
assert(MUTEX_HELD(&svc_mutex));
xret = xlist->next;
if (xret) {
xlist->next = xret->next;
xret->next = NULL;
xprt = xret->xprt;
svc_flags(xprt) = svc_flags(parent);
} else
xprt = svc_copy(parent);
if (xprt) {
SVCEXT(parent)->refcnt++;
(void) rw_wrlock(&svc_fd_lock);
clear_pollfd(fd);
(void) rw_unlock(&svc_fd_lock);
}
return (xprt);
}
static void
_svc_done_private(SVCXPRT *xprt)
{
SVCXPRT *parent;
SVCXPRT_LIST *xhead, *xlist;
assert(MUTEX_HELD(&svc_mutex));
if ((parent = SVCEXT(xprt)->parent) == NULL)
return;
xhead = SVCEXT(parent)->my_xlist;
xlist = SVCEXT(xprt)->my_xlist;
xlist->next = xhead->next;
xhead->next = xlist;
SVCEXT(parent)->refcnt--;
svc_flags(xprt) |= svc_flags(parent);
if (svc_failed(xprt) || svc_defunct(xprt)) {
svc_flags(parent) |= (svc_flags(xprt) &
(SVC_FAILED | SVC_DEFUNCT));
if (SVCEXT(parent)->refcnt == 0)
_svc_destroy_private(xprt);
}
}
void
svc_done(SVCXPRT *xprt)
{
if (svc_mt_mode != RPC_SVC_MT_USER)
return;
if (svc_type(xprt) == SVC_DOOR)
return;
if (svc_flags(xprt) & SVC_ARGS_CHECK)
svc_args_done(xprt);
(void) mutex_lock(&svc_mutex);
_svc_done_private(xprt);
(void) mutex_unlock(&svc_mutex);
}
void
svc_args_done(SVCXPRT *xprt)
{
char dummy;
SVCXPRT *parent = SVCEXT(xprt)->parent;
bool_t wake_up_poller;
enum xprt_stat stat;
svc_flags(xprt) |= svc_flags(parent);
svc_flags(xprt) &= ~SVC_ARGS_CHECK;
if (svc_failed(xprt) || svc_defunct(parent))
return;
if (svc_type(xprt) == SVC_CONNECTION &&
(stat = SVC_STAT(xprt)) != XPRT_IDLE) {
if (stat == XPRT_MOREREQS) {
(void) mutex_lock(&svc_mutex);
svc_pending_fds[svc_last_pending++] = xprt->xp_fd;
if (svc_last_pending > CIRCULAR_BUFSIZE)
svc_last_pending = 0;
svc_total_pending++;
(void) mutex_unlock(&svc_mutex);
wake_up_poller = FALSE;
} else {
return;
}
} else {
(void) rw_wrlock(&svc_fd_lock);
set_pollfd(xprt->xp_fd, MASKVAL);
(void) rw_unlock(&svc_fd_lock);
wake_up_poller = TRUE;
}
if (!wake_up_poller || !svc_polling) {
(void) mutex_lock(&svc_mutex);
if (svc_waiters > 0) {
(void) cond_broadcast(&svc_thr_fdwait);
(void) mutex_unlock(&svc_mutex);
return;
}
(void) mutex_unlock(&svc_mutex);
}
if (svc_polling)
(void) write(svc_pipe[1], &dummy, sizeof (dummy));
}
int
__rpc_legal_connmaxrec(int suggested) {
if (suggested == -1) {
return (RPC_MAXDATASIZE + 2*sizeof (uint32_t));
} else if (suggested < 0) {
return (-1);
} else if (suggested > 0) {
suggested -= suggested % BYTES_PER_XDR_UNIT;
if (suggested < MAXINT-(2*sizeof (uint32_t))) {
suggested += 2 * sizeof (uint32_t);
} else {
suggested = MAXINT;
}
if (suggested < sizeof (struct rpc_msg)) {
return (-1);
}
}
return (suggested);
}
bool_t
rpc_control(int op, void *info)
{
int tmp;
switch (op) {
case RPC_SVC_MTMODE_SET:
tmp = *((int *)info);
if (tmp != RPC_SVC_MT_NONE && tmp != RPC_SVC_MT_AUTO &&
tmp != RPC_SVC_MT_USER)
return (FALSE);
if (svc_mt_mode != RPC_SVC_MT_NONE && svc_mt_mode != tmp)
return (FALSE);
svc_mt_mode = tmp;
return (TRUE);
case RPC_SVC_MTMODE_GET:
*((int *)info) = svc_mt_mode;
return (TRUE);
case RPC_SVC_THRMAX_SET:
if ((tmp = *((int *)info)) < 1)
return (FALSE);
(void) mutex_lock(&svc_mutex);
svc_thr_max = tmp;
(void) mutex_unlock(&svc_mutex);
return (TRUE);
case RPC_SVC_THRMAX_GET:
*((int *)info) = svc_thr_max;
return (TRUE);
case RPC_SVC_THRTOTAL_GET:
*((int *)info) = svc_thr_total;
return (TRUE);
case RPC_SVC_THRCREATES_GET:
*((int *)info) = svc_thr_total_creates;
return (TRUE);
case RPC_SVC_THRERRORS_GET:
*((int *)info) = svc_thr_total_create_errors;
return (TRUE);
case RPC_SVC_USE_POLLFD:
if (*((int *)info) && !__rpc_use_pollfd_done) {
__rpc_use_pollfd_done = 1;
return (TRUE);
}
return (FALSE);
case RPC_SVC_CONNMAXREC_SET:
tmp = __rpc_legal_connmaxrec(*(int *)info);
if (tmp >= 0) {
__rpc_connmaxrec = tmp;
return (TRUE);
} else {
return (FALSE);
}
case RPC_SVC_CONNMAXREC_GET:
*((int *)info) = __rpc_connmaxrec;
return (TRUE);
case RPC_SVC_IRTIMEOUT_SET:
tmp = *((int *)info);
if (tmp >= 0) {
__rpc_irtimeout = tmp;
return (TRUE);
} else {
return (FALSE);
}
case __RPC_SVC_EXCLBIND_SET:
if (info) {
__rpc_tp_exclbind = *((bool_t *)info);
return (TRUE);
}
return (FALSE);
case __RPC_SVC_EXCLBIND_GET:
if (info) {
*((bool_t *)info) = __rpc_tp_exclbind;
return (TRUE);
}
return (FALSE);
case __RPC_SVC_LSTNBKLOG_SET:
tmp = *(int *)info;
if (tmp > 0) {
__svc_lstnbklog = tmp;
return (TRUE);
}
return (FALSE);
case __RPC_SVC_LSTNBKLOG_GET:
*(int *)info = __svc_lstnbklog;
return (TRUE);
default:
return (FALSE);
}
}