#include "mt.h"
#include "rpc_mt.h"
#include <assert.h>
#include <rpc/rpc.h>
#include <errno.h>
#include <sys/byteorder.h>
#include <sys/mkdev.h>
#include <sys/poll.h>
#include <syslog.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <limits.h>
#define MCALL_MSG_SIZE 24
#define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000)
#define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000)
#define USECS_TO_NS(x) ((hrtime_t)(x) * 1000)
#define NSECS_TO_MS(x) ((x) / 1000 / 1000)
#ifndef MIN
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
extern int __rpc_timeval_to_msec(struct timeval *);
extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
caddr_t);
extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
rpcvers_t, uint_t, uint_t, const struct timeval *);
static struct clnt_ops *clnt_vc_ops(void);
static int read_vc(void *, caddr_t, int);
static int write_vc(void *, caddr_t, int);
static int t_rcvall(int, char *, int);
static bool_t time_not_ok(struct timeval *);
struct ct_data;
static bool_t set_up_connection(int, struct netbuf *,
struct ct_data *, const struct timeval *);
static bool_t set_io_mode(struct ct_data *, int);
static mutex_t vctbl_lock = DEFAULTMUTEX;
static void *vctbl = NULL;
static const char clnt_vc_errstr[] = "%s : %s";
static const char clnt_vc_str[] = "clnt_vc_create";
static const char clnt_read_vc_str[] = "read_vc";
static const char __no_mem_str[] = "out of memory";
static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
static const char no_nonblock_str[] = "could not set transport blocking mode";
struct ct_data {
int ct_fd;
bool_t ct_closeit;
int ct_tsdu;
int ct_wait;
bool_t ct_waitset;
struct netbuf ct_addr;
struct rpc_err ct_error;
char ct_mcall[MCALL_MSG_SIZE];
uint_t ct_mpos;
XDR ct_xdrs;
bool_t ct_is_oneway;
bool_t ct_is_blocking;
ushort_t ct_io_mode;
ushort_t ct_blocking_mode;
uint_t ct_bufferSize;
uint_t ct_bufferPendingSize;
char *ct_buffer;
char *ct_bufferWritePtr;
char *ct_bufferReadPtr;
};
struct nb_reg_node {
struct nb_reg_node *next;
struct ct_data *ct;
};
static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free;
static bool_t exit_handler_set = FALSE;
static mutex_t nb_list_mutex = DEFAULTMUTEX;
#define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
#define LIST_CLR(l) (l = (struct nb_reg_node *)&l)
#define LIST_ADD(l, node) (node->next = l->next, l = node)
#define LIST_EXTRACT(l, node) (node = l, l = l->next)
#define LIST_FOR_EACH(l, node) \
for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
#define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
static int nb_send(struct ct_data *, void *, unsigned int);
static int do_flush(struct ct_data *, uint_t);
static bool_t set_flush_mode(struct ct_data *, int);
static bool_t set_blocking_connection(struct ct_data *, bool_t);
static int register_nb(struct ct_data *);
static int unregister_nb(struct ct_data *);
static bool_t
set_blocking_connection(struct ct_data *ct, bool_t blocking)
{
int flag;
if (ct->ct_is_blocking == blocking)
return (TRUE);
if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
(void) syslog(LOG_ERR, "set_blocking_connection : %s",
no_fcntl_getfl_str);
return (FALSE);
}
flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
(void) syslog(LOG_ERR, "set_blocking_connection : %s",
no_nonblock_str);
return (FALSE);
}
ct->ct_is_blocking = blocking;
return (TRUE);
}
CLIENT *
clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
{
return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
recvsz, NULL));
}
CLIENT *
_clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
{
CLIENT *cl;
struct ct_data *ct;
struct timeval now;
struct rpc_msg call_msg;
struct t_info tinfo;
int flag;
cl = malloc(sizeof (*cl));
if ((ct = malloc(sizeof (*ct))) != NULL)
ct->ct_addr.buf = NULL;
if ((cl == NULL) || (ct == NULL)) {
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_vc_str, __no_mem_str);
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = errno;
rpc_createerr.cf_error.re_terrno = 0;
goto err;
}
sig_mutex_lock(&vctbl_lock);
if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = errno;
rpc_createerr.cf_error.re_terrno = 0;
sig_mutex_unlock(&vctbl_lock);
goto err;
}
sig_mutex_unlock(&vctbl_lock);
ct->ct_io_mode = RPC_CL_BLOCKING;
ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
ct->ct_buffer = NULL;
ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
ct->ct_bufferPendingSize = 0;
ct->ct_bufferWritePtr = NULL;
ct->ct_bufferReadPtr = NULL;
if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
no_fcntl_getfl_str);
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_terrno = errno;
rpc_createerr.cf_error.re_errno = 0;
goto err;
}
ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
goto err;
}
ct->ct_fd = fd;
ct->ct_wait = 30000;
ct->ct_waitset = FALSE;
ct->ct_closeit = FALSE;
(void) gettimeofday(&now, (struct timezone *)0);
call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
call_msg.rm_call.cb_prog = prog;
call_msg.rm_call.cb_vers = vers;
xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
goto err;
}
ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
XDR_DESTROY(&(ct->ct_xdrs));
if (t_getinfo(fd, &tinfo) == -1) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = 0;
goto err;
}
sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
if ((sendsz == 0) || (recvsz == 0)) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = 0;
rpc_createerr.cf_error.re_errno = 0;
goto err;
}
ct->ct_tsdu = tinfo.tsdu;
ct->ct_xdrs.x_ops = NULL;
xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
read_vc, write_vc);
if (ct->ct_xdrs.x_ops == NULL) {
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_terrno = 0;
rpc_createerr.cf_error.re_errno = ENOMEM;
goto err;
}
cl->cl_ops = clnt_vc_ops();
cl->cl_private = (caddr_t)ct;
cl->cl_auth = authnone_create();
cl->cl_tp = NULL;
cl->cl_netid = NULL;
return (cl);
err:
if (ct) {
free(ct->ct_addr.buf);
free(ct);
}
free(cl);
return (NULL);
}
#define TCPOPT_BUFSIZE 128
static int
_set_tcp_conntime(int fd, int optval)
{
struct t_optmgmt req, res;
struct opthdr *opt;
int *ip;
char buf[TCPOPT_BUFSIZE];
opt = (struct opthdr *)buf;
opt->level = IPPROTO_TCP;
opt->name = TCP_CONN_ABORT_THRESHOLD;
opt->len = sizeof (int);
req.flags = T_NEGOTIATE;
req.opt.len = sizeof (struct opthdr) + opt->len;
req.opt.buf = (char *)opt;
ip = (int *)((char *)buf + sizeof (struct opthdr));
*ip = optval;
res.flags = 0;
res.opt.buf = (char *)buf;
res.opt.maxlen = sizeof (buf);
if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
return (-1);
}
return (0);
}
static int
_get_tcp_conntime(int fd)
{
struct t_optmgmt req, res;
struct opthdr *opt;
int *ip, retval;
char buf[TCPOPT_BUFSIZE];
opt = (struct opthdr *)buf;
opt->level = IPPROTO_TCP;
opt->name = TCP_CONN_ABORT_THRESHOLD;
opt->len = sizeof (int);
req.flags = T_CURRENT;
req.opt.len = sizeof (struct opthdr) + opt->len;
req.opt.buf = (char *)opt;
ip = (int *)((char *)buf + sizeof (struct opthdr));
*ip = 0;
res.flags = 0;
res.opt.buf = (char *)buf;
res.opt.maxlen = sizeof (buf);
if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
return (-1);
}
ip = (int *)((char *)buf + sizeof (struct opthdr));
retval = *ip;
return (retval);
}
static bool_t
set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
const struct timeval *tp)
{
int state;
struct t_call sndcallstr, *rcvcall;
int nconnect;
bool_t connected, do_rcv_connect;
int curr_time = -1;
hrtime_t start;
hrtime_t tout;
ct->ct_addr.len = 0;
state = t_getstate(fd);
if (state == -1) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_errno = 0;
rpc_createerr.cf_error.re_terrno = t_errno;
return (FALSE);
}
switch (state) {
case T_IDLE:
if (svcaddr == NULL) {
rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
return (FALSE);
}
rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
if (rcvcall == NULL) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = errno;
return (FALSE);
}
rcvcall->udata.maxlen = 0;
sndcallstr.addr = *svcaddr;
sndcallstr.opt.len = 0;
sndcallstr.udata.len = 0;
connected = FALSE;
do_rcv_connect = FALSE;
if (tp != NULL) {
start = gethrtime();
tout = SECS_TO_NS(tp->tv_sec) +
USECS_TO_NS(tp->tv_usec);
curr_time = _get_tcp_conntime(fd);
}
for (nconnect = 0; nconnect < 3; nconnect++) {
if (tp != NULL) {
hrtime_t elapsed = gethrtime() - start;
if (elapsed >= tout)
break;
if (curr_time != -1) {
int ms;
if (NSECS_TO_MS(tout - elapsed) >=
INT_MAX) {
ms = INT_MAX;
} else {
ms = (int)
NSECS_TO_MS(tout - elapsed);
if (MSECS_TO_NS(ms) !=
tout - elapsed)
ms++;
}
(void) _set_tcp_conntime(fd, ms);
}
}
if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
connected = TRUE;
break;
}
if (t_errno == TLOOK) {
switch (t_look(fd)) {
case T_DISCONNECT:
(void) t_rcvdis(fd, (struct
t_discon *) NULL);
break;
default:
break;
}
} else if (!(t_errno == TSYSERR && errno == EINTR)) {
break;
}
if ((state = t_getstate(fd)) == T_OUTCON) {
do_rcv_connect = TRUE;
break;
}
if (state != T_IDLE) {
break;
}
}
if (do_rcv_connect) {
do {
if (t_rcvconnect(fd, rcvcall) != -1) {
connected = TRUE;
break;
}
} while (t_errno == TSYSERR && errno == EINTR);
}
if (curr_time != -1) {
(void) _set_tcp_conntime(fd, curr_time);
}
if (!connected) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = errno;
(void) t_free((char *)rcvcall, T_CALL);
return (FALSE);
}
if (ct->ct_addr.buf)
free(ct->ct_addr.buf);
ct->ct_addr = rcvcall->addr;
rcvcall->addr.buf = NULL;
(void) t_free((char *)rcvcall, T_CALL);
break;
case T_DATAXFER:
case T_OUTCON:
if (svcaddr == NULL) {
ct->ct_addr.len = 0;
} else {
ct->ct_addr.buf = malloc(svcaddr->len);
if (ct->ct_addr.buf == NULL) {
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_vc_str, __no_mem_str);
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = errno;
rpc_createerr.cf_error.re_terrno = 0;
return (FALSE);
}
(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
(size_t)svcaddr->len);
ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
}
break;
default:
rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
return (FALSE);
}
return (TRUE);
}
static enum clnt_stat
clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
{
struct ct_data *ct = (struct ct_data *)cl->cl_private;
XDR *xdrs = &(ct->ct_xdrs);
struct rpc_msg reply_msg;
uint32_t x_id;
uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);
bool_t shipnow;
int refreshes = 2;
if (rpc_fd_lock(vctbl, ct->ct_fd)) {
rpc_callerr.re_status = RPC_FAILED;
rpc_callerr.re_errno = errno;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED);
}
ct->ct_is_oneway = FALSE;
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED);
}
}
if (!ct->ct_waitset) {
if (time_not_ok(&timeout) == FALSE)
ct->ct_wait = __rpc_timeval_to_msec(&timeout);
} else {
timeout.tv_sec = (ct->ct_wait / 1000);
timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
}
shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
(timeout.tv_usec == 0)) ? FALSE : TRUE;
call_again:
xdrs->x_op = XDR_ENCODE;
rpc_callerr.re_status = RPC_SUCCESS;
x_id = ntohl(*msg_x_id) - 1;
*msg_x_id = htonl(x_id);
if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
(!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
(!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
(!xdr_args(xdrs, args_ptr))) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
} else {
uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
IXDR_PUT_U_INT32(u, proc);
if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
}
if (!xdrrec_endofrecord(xdrs, shipnow)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status = RPC_CANTSEND);
}
if (!shipnow) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_SUCCESS);
}
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status = RPC_TIMEDOUT);
}
xdrs->x_op = XDR_DECODE;
for (;;) {
reply_msg.acpted_rply.ar_verf = _null_auth;
reply_msg.acpted_rply.ar_results.where = NULL;
reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
if (!xdrrec_skiprecord(xdrs)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
if (!xdr_replymsg(xdrs, &reply_msg)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
continue;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
if (reply_msg.rm_xid == x_id)
break;
}
if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
(reply_msg.acpted_rply.ar_stat == SUCCESS))
rpc_callerr.re_status = RPC_SUCCESS;
else
__seterr_reply(&reply_msg, &(rpc_callerr));
if (rpc_callerr.re_status == RPC_SUCCESS) {
if (!AUTH_VALIDATE(cl->cl_auth,
&reply_msg.acpted_rply.ar_verf)) {
rpc_callerr.re_status = RPC_AUTHERROR;
rpc_callerr.re_why = AUTH_INVALIDRESP;
} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
if (!(*xdr_results)(xdrs, results_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status =
RPC_CANTDECODERES;
}
} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
results_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTDECODERES;
}
}
else if (rpc_callerr.re_status == RPC_AUTHERROR) {
if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
goto call_again;
else
rpc_callerr.re_status = RPC_AUTHERROR;
}
if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
xdrs->x_op = XDR_FREE;
(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
}
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
static enum clnt_stat
clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
{
struct ct_data *ct = (struct ct_data *)cl->cl_private;
XDR *xdrs = &(ct->ct_xdrs);
uint32_t x_id;
uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);
if (rpc_fd_lock(vctbl, ct->ct_fd)) {
rpc_callerr.re_status = RPC_FAILED;
rpc_callerr.re_errno = errno;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED);
}
ct->ct_is_oneway = TRUE;
xdrs->x_op = XDR_ENCODE;
rpc_callerr.re_status = RPC_SUCCESS;
x_id = ntohl(*msg_x_id) - 1;
*msg_x_id = htonl(x_id);
if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
(!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
(!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
(!xdr_args(xdrs, args_ptr))) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
} else {
uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
IXDR_PUT_U_INT32(u, proc);
if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
}
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (rpc_callerr.re_status);
}
static void
clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
{
*errp = rpc_callerr;
}
static bool_t
clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
{
struct ct_data *ct = (struct ct_data *)cl->cl_private;
XDR *xdrs = &(ct->ct_xdrs);
bool_t stat;
(void) rpc_fd_lock(vctbl, ct->ct_fd);
xdrs->x_op = XDR_FREE;
stat = (*xdr_res)(xdrs, res_ptr);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (stat);
}
static void
clnt_vc_abort(void)
{
}
static bool_t
clnt_vc_control(CLIENT *cl, int request, char *info)
{
bool_t ret;
struct ct_data *ct = (struct ct_data *)cl->cl_private;
if (rpc_fd_lock(vctbl, ct->ct_fd)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
switch (request) {
case CLSET_FD_CLOSE:
ct->ct_closeit = TRUE;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (TRUE);
case CLSET_FD_NCLOSE:
ct->ct_closeit = FALSE;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (TRUE);
case CLFLUSH:
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
int res;
res = do_flush(ct, (info == NULL ||
*(int *)info == RPC_CL_DEFAULT_FLUSH)?
ct->ct_blocking_mode: *(int *)info);
ret = (0 == res);
} else {
ret = FALSE;
}
rpc_fd_unlock(vctbl, ct->ct_fd);
return (ret);
}
if (info == NULL) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
switch (request) {
case CLSET_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
ct->ct_waitset = TRUE;
break;
case CLGET_TIMEOUT:
((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000;
break;
case CLGET_SERVER_ADDR:
(void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
break;
case CLGET_FD:
*(int *)info = ct->ct_fd;
break;
case CLGET_SVC_ADDR:
*(struct netbuf *)info = ct->ct_addr;
break;
case CLSET_SVC_ADDR:
#ifdef undef
if (t_snddis(ct->ct_fd, NULL) == -1) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = errno;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
ct, NULL);
rpc_fd_unlock(vctbl, ct->ct_fd);
return (ret);
#else
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
#endif
case CLGET_XID:
*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
break;
case CLSET_XID:
*(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1);
break;
case CLGET_VERS:
*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
4 * BYTES_PER_XDR_UNIT));
break;
case CLSET_VERS:
*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
htonl(*(uint32_t *)info);
break;
case CLGET_PROG:
*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
3 * BYTES_PER_XDR_UNIT));
break;
case CLSET_PROG:
*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
htonl(*(uint32_t *)info);
break;
case CLSET_IO_MODE:
if (!set_io_mode(ct, *(int *)info)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
break;
case CLSET_FLUSH_MODE:
if (!set_flush_mode(ct, *(int *)info)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
break;
case CLGET_FLUSH_MODE:
*(rpcflushmode_t *)info = ct->ct_blocking_mode;
break;
case CLGET_IO_MODE:
*(rpciomode_t *)info = ct->ct_io_mode;
break;
case CLGET_CURRENT_REC_SIZE:
*(int *)info = ct->ct_bufferPendingSize;
break;
case CLSET_CONNMAXREC_SIZE:
if (ct->ct_bufferPendingSize != 0) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
if (ct->ct_bufferSize == *(uint_t *)info)
break;
ct->ct_bufferSize = *(uint_t *)info;
if (ct->ct_buffer) {
free(ct->ct_buffer);
ct->ct_buffer = NULL;
ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
}
break;
case CLGET_CONNMAXREC_SIZE:
*(uint_t *)info = ct->ct_bufferSize;
break;
default:
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
rpc_fd_unlock(vctbl, ct->ct_fd);
return (TRUE);
}
static void
clnt_vc_destroy(CLIENT *cl)
{
struct ct_data *ct = (struct ct_data *)cl->cl_private;
int ct_fd = ct->ct_fd;
(void) rpc_fd_lock(vctbl, ct_fd);
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
(void) unregister_nb(ct);
}
if (ct->ct_closeit)
(void) t_close(ct_fd);
XDR_DESTROY(&(ct->ct_xdrs));
if (ct->ct_addr.buf)
free(ct->ct_addr.buf);
free(ct);
if (cl->cl_netid && cl->cl_netid[0])
free(cl->cl_netid);
if (cl->cl_tp && cl->cl_tp[0])
free(cl->cl_tp);
free(cl);
rpc_fd_unlock(vctbl, ct_fd);
}
static int
read_vc(void *ct_tmp, caddr_t buf, int len)
{
static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP;
struct pollfd *pfdp;
int npfd;
struct ct_data *ct = ct_tmp;
struct timeval starttime;
struct timeval curtime;
int poll_time;
int delta;
if (len == 0)
return (0);
npfd = 1;
pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
if (pfdp == NULL) {
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_read_vc_str, __no_mem_str);
rpc_callerr.re_status = RPC_SYSTEMERROR;
rpc_callerr.re_errno = errno;
rpc_callerr.re_terrno = 0;
return (-1);
}
pfdp[0].fd = ct->ct_fd;
pfdp[0].events = MASKVAL;
pfdp[0].revents = 0;
poll_time = ct->ct_wait;
if (gettimeofday(&starttime, NULL) == -1) {
syslog(LOG_ERR, "Unable to get time of day: %m");
return (-1);
}
for (;;) {
extern void (*_svc_getreqset_proc)();
extern pollfd_t *svc_pollfd;
extern int svc_max_pollfd;
int fds;
if (_svc_getreqset_proc) {
sig_rw_rdlock(&svc_fd_lock);
if (npfd != (svc_max_pollfd + 1)) {
struct pollfd *tmp_pfdp = realloc(pfdp,
sizeof (struct pollfd) *
(svc_max_pollfd + 1));
if (tmp_pfdp == NULL) {
sig_rw_unlock(&svc_fd_lock);
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_read_vc_str, __no_mem_str);
rpc_callerr.re_status = RPC_SYSTEMERROR;
rpc_callerr.re_errno = errno;
rpc_callerr.re_terrno = 0;
return (-1);
}
pfdp = tmp_pfdp;
npfd = svc_max_pollfd + 1;
(void) pthread_setspecific(pfdp_key, pfdp);
}
if (npfd > 1)
(void) memcpy(&pfdp[1], svc_pollfd,
sizeof (struct pollfd) * (npfd - 1));
sig_rw_unlock(&svc_fd_lock);
} else {
npfd = 1;
}
switch (fds = poll(pfdp, npfd, poll_time)) {
case 0:
rpc_callerr.re_status = RPC_TIMEDOUT;
return (-1);
case -1:
if (errno != EINTR)
continue;
else {
if (gettimeofday(&curtime, NULL) == -1) {
syslog(LOG_ERR,
"Unable to get time of day: %m");
errno = 0;
continue;
};
delta = (curtime.tv_sec -
starttime.tv_sec) * 1000 +
(curtime.tv_usec -
starttime.tv_usec) / 1000;
poll_time -= delta;
if (poll_time < 0) {
rpc_callerr.re_status = RPC_TIMEDOUT;
errno = 0;
return (-1);
} else {
errno = 0;
continue;
}
}
}
if (pfdp[0].revents == 0) {
(*_svc_getreqset_proc)(&pfdp[1], fds);
continue;
}
if (pfdp[0].revents & POLLNVAL) {
rpc_callerr.re_status = RPC_CANTRECV;
rpc_callerr.re_errno = errno = EBADF;
return (-1);
}
if (pfdp[0].revents & (POLLERR | POLLHUP)) {
rpc_callerr.re_status = RPC_CANTRECV;
rpc_callerr.re_errno = errno = EPIPE;
return (-1);
}
break;
}
switch (len = t_rcvall(ct->ct_fd, buf, len)) {
case 0:
rpc_callerr.re_errno = ENOLINK;
rpc_callerr.re_terrno = 0;
rpc_callerr.re_status = RPC_CANTRECV;
len = -1;
break;
case -1:
rpc_callerr.re_terrno = t_errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTRECV;
break;
}
return (len);
}
static int
write_vc(void *ct_tmp, caddr_t buf, int len)
{
int i, cnt;
struct ct_data *ct = ct_tmp;
int flag;
int maxsz;
maxsz = ct->ct_tsdu;
if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
if (maxsz > 0 && len > maxsz) {
rpc_callerr.re_terrno = errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
return (-1);
}
len = nb_send(ct, buf, (unsigned)len);
if (len == -1) {
rpc_callerr.re_terrno = errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
} else if (len == -2) {
rpc_callerr.re_terrno = 0;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSTORE;
}
return (len);
}
if ((maxsz == 0) || (maxsz == -1)) {
if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
rpc_callerr.re_terrno = t_errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
}
return (len);
}
for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
flag = cnt > maxsz ? T_MORE : 0;
if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
flag)) == -1) {
rpc_callerr.re_terrno = t_errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
return (-1);
}
}
return (len);
}
static int
t_rcvall(int fd, char *buf, int len)
{
int moreflag;
int final = 0;
int res;
do {
moreflag = 0;
res = t_rcv(fd, buf, (unsigned)len, &moreflag);
if (res == -1) {
if (t_errno == TLOOK)
switch (t_look(fd)) {
case T_DISCONNECT:
(void) t_rcvdis(fd, NULL);
(void) t_snddis(fd, NULL);
return (-1);
case T_ORDREL:
(void) t_rcvrel(fd);
(void) t_sndrel(fd);
return (-1);
default:
return (-1);
}
} else if (res == 0) {
return (0);
}
final += res;
buf += res;
len -= res;
} while ((len > 0) && (moreflag & T_MORE));
return (final);
}
static struct clnt_ops *
clnt_vc_ops(void)
{
static struct clnt_ops ops;
extern mutex_t ops_lock;
sig_mutex_lock(&ops_lock);
if (ops.cl_call == NULL) {
ops.cl_call = clnt_vc_call;
ops.cl_send = clnt_vc_send;
ops.cl_abort = clnt_vc_abort;
ops.cl_geterr = clnt_vc_geterr;
ops.cl_freeres = clnt_vc_freeres;
ops.cl_destroy = clnt_vc_destroy;
ops.cl_control = clnt_vc_control;
}
sig_mutex_unlock(&ops_lock);
return (&ops);
}
static bool_t
time_not_ok(struct timeval *t)
{
return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
t->tv_usec <= -1 || t->tv_usec > 1000000);
}
#define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
static int
addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
{
if (NULL == ct->ct_buffer) {
char *buffer;
buffer = malloc(ct->ct_bufferSize);
if (NULL == buffer) {
errno = ENOMEM;
return (-1);
}
(void) memcpy(buffer, dataToAdd, nBytes);
ct->ct_buffer = buffer;
ct->ct_bufferReadPtr = buffer;
ct->ct_bufferWritePtr = buffer + nBytes;
ct->ct_bufferPendingSize = nBytes;
} else {
int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
ct->ct_bufferPendingSize += nBytes;
(void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
ct->ct_bufferWritePtr += len;
nBytes -= len;
if (0 == nBytes) {
if (ct->ct_bufferWritePtr ==
(ct->ct_buffer + ct->ct_bufferSize)) {
ct->ct_bufferWritePtr = ct->ct_buffer;
}
} else {
dataToAdd += len;
(void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
}
}
return (0);
}
static void
consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
{
ct->ct_bufferPendingSize -= nBytes;
if (ct->ct_bufferPendingSize == 0) {
ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
} else {
ct->ct_bufferReadPtr += nBytes;
if (ct->ct_bufferReadPtr >
ct->ct_buffer + ct->ct_bufferSize) {
ct->ct_bufferReadPtr -= ct->ct_bufferSize;
}
}
}
static int
iovFromBuffer(struct ct_data *ct, struct iovec *iov)
{
int l;
if (ct->ct_bufferPendingSize == 0)
return (0);
l = REMAIN_BYTES(bufferReadPtr);
if (l < ct->ct_bufferPendingSize) {
iov[0].iov_base = ct->ct_bufferReadPtr;
iov[0].iov_len = l;
iov[1].iov_base = ct->ct_buffer;
iov[1].iov_len = ct->ct_bufferPendingSize - l;
return (2);
} else {
iov[0].iov_base = ct->ct_bufferReadPtr;
iov[0].iov_len = ct->ct_bufferPendingSize;
return (1);
}
}
static bool_t
set_flush_mode(struct ct_data *ct, int mode)
{
switch (mode) {
case RPC_CL_BLOCKING_FLUSH:
case RPC_CL_BESTEFFORT_FLUSH:
case RPC_CL_DEFAULT_FLUSH:
ct->ct_blocking_mode = mode;
return (TRUE);
default:
return (FALSE);
}
}
static bool_t
set_io_mode(struct ct_data *ct, int ioMode)
{
switch (ioMode) {
case RPC_CL_BLOCKING:
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
if (NULL != ct->ct_buffer) {
(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
free(ct->ct_buffer);
ct->ct_buffer = NULL;
}
(void) unregister_nb(ct);
ct->ct_io_mode = ioMode;
}
break;
case RPC_CL_NONBLOCKING:
if (ct->ct_io_mode == RPC_CL_BLOCKING) {
if (-1 == register_nb(ct)) {
return (FALSE);
}
ct->ct_io_mode = ioMode;
}
break;
default:
return (FALSE);
}
return (TRUE);
}
static int
do_flush(struct ct_data *ct, uint_t flush_mode)
{
int result;
if (ct->ct_bufferPendingSize == 0) {
return (0);
}
switch (flush_mode) {
case RPC_CL_BLOCKING_FLUSH:
if (!set_blocking_connection(ct, TRUE)) {
return (-1);
}
while (ct->ct_bufferPendingSize > 0) {
if (REMAIN_BYTES(bufferReadPtr) <
ct->ct_bufferPendingSize) {
struct iovec iov[2];
(void) iovFromBuffer(ct, iov);
result = writev(ct->ct_fd, iov, 2);
} else {
result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
ct->ct_bufferPendingSize, 0);
}
if (result < 0) {
return (-1);
}
consumeFromBuffer(ct, result);
}
break;
case RPC_CL_BESTEFFORT_FLUSH:
(void) set_blocking_connection(ct, FALSE);
if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
struct iovec iov[2];
(void) iovFromBuffer(ct, iov);
result = writev(ct->ct_fd, iov, 2);
} else {
result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
ct->ct_bufferPendingSize, 0);
}
if (result < 0) {
if (errno != EWOULDBLOCK) {
perror("flush");
return (-1);
}
return (0);
}
if (result > 0)
consumeFromBuffer(ct, result);
break;
}
return (0);
}
#define LAST_FRAG(x) ((ntohl(*(uint32_t *)x) & (1U << 31)) == (1U << 31))
static int
nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
{
int result;
if (!LAST_FRAG(buff)) {
return (-1);
}
if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
(void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
return (-2);
}
(void) set_blocking_connection(ct, FALSE);
if (ct->ct_bufferPendingSize == 0) {
result = t_snd(ct->ct_fd, buff, nBytes, 0);
if (result == -1) {
if (errno == EWOULDBLOCK) {
result = 0;
} else {
perror("send");
return (-1);
}
}
if (result != nBytes) {
if (addInBuffer(ct, (char *)buff + result,
nBytes - result) == -1) {
return (-1);
}
}
} else {
struct iovec iov[3];
int i = iovFromBuffer(ct, &iov[0]);
iov[i].iov_base = buff;
iov[i].iov_len = nBytes;
result = writev(ct->ct_fd, iov, i+1);
if (result == -1) {
if (errno == EWOULDBLOCK) {
result = 0;
} else {
return (-1);
}
}
if (result <= ct->ct_bufferPendingSize) {
consumeFromBuffer(ct, result);
if (addInBuffer(ct, buff, nBytes) == -1) {
return (-1);
}
} else {
int len = result - ct->ct_bufferPendingSize;
ct->ct_bufferReadPtr = ct->ct_buffer;
ct->ct_bufferWritePtr = ct->ct_buffer;
ct->ct_bufferPendingSize = 0;
if (len != nBytes) {
if (addInBuffer(ct, (char *)buff + len,
nBytes-len) == -1) {
return (-1);
}
}
}
}
return (nBytes);
}
static void
flush_registered_clients(void)
{
struct nb_reg_node *node;
if (LIST_ISEMPTY(nb_first)) {
return;
}
LIST_FOR_EACH(nb_first, node) {
(void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
}
}
static int
allocate_chunk(void)
{
#define CHUNK_SIZE 16
struct nb_reg_node *chk =
malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
struct nb_reg_node *n;
int i;
if (NULL == chk) {
return (-1);
}
n = chk;
for (i = 0; i < CHUNK_SIZE-1; ++i) {
n[i].next = &(n[i+1]);
}
n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
nb_free = chk;
return (0);
}
static int
register_nb(struct ct_data *ct)
{
struct nb_reg_node *node;
(void) mutex_lock(&nb_list_mutex);
if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
(void) mutex_unlock(&nb_list_mutex);
errno = ENOMEM;
return (-1);
}
if (!exit_handler_set) {
(void) atexit(flush_registered_clients);
exit_handler_set = TRUE;
}
LIST_EXTRACT(nb_free, node);
node->ct = ct;
LIST_ADD(nb_first, node);
(void) mutex_unlock(&nb_list_mutex);
return (0);
}
static int
unregister_nb(struct ct_data *ct)
{
struct nb_reg_node *node;
(void) mutex_lock(&nb_list_mutex);
assert(!LIST_ISEMPTY(nb_first));
node = nb_first;
LIST_FOR_EACH(nb_first, node) {
if (node->next->ct == ct) {
struct nb_reg_node *n = node->next;
node->next = n->next;
n->ct = NULL;
LIST_ADD(nb_free, n);
break;
}
}
(void) mutex_unlock(&nb_list_mutex);
return (0);
}