#include <sys/types.h>
#include <sys/sysmacros.h>
#include <sys/param.h>
#include <sys/errno.h>
#include <sys/signal.h>
#include <sys/proc.h>
#include <sys/conf.h>
#include <sys/cred.h>
#include <sys/user.h>
#include <sys/vnode.h>
#include <sys/file.h>
#include <sys/session.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
#include <sys/stropts.h>
#include <sys/poll.h>
#include <sys/systm.h>
#include <sys/cpuvar.h>
#include <sys/uio.h>
#include <sys/cmn_err.h>
#include <sys/priocntl.h>
#include <sys/procset.h>
#include <sys/vmem.h>
#include <sys/bitmap.h>
#include <sys/kmem.h>
#include <sys/siginfo.h>
#include <sys/vtrace.h>
#include <sys/callb.h>
#include <sys/debug.h>
#include <sys/modctl.h>
#include <sys/vmsystm.h>
#include <vm/page.h>
#include <sys/atomic.h>
#include <sys/suntpi.h>
#include <sys/strlog.h>
#include <sys/promif.h>
#include <sys/project.h>
#include <sys/vm.h>
#include <sys/taskq.h>
#include <sys/sunddi.h>
#include <sys/sunldi_impl.h>
#include <sys/strsun.h>
#include <sys/isa_defs.h>
#include <sys/pattr.h>
#include <sys/strft.h>
#include <sys/fs/snode.h>
#include <sys/zone.h>
#include <sys/open.h>
#include <sys/sunldi.h>
#include <sys/sad.h>
#include <sys/netstack.h>
#define O_SAMESTR(q) (((q)->q_next) && \
(((q)->q_flag & QREADR) == ((q)->q_next->q_flag & QREADR)))
static int32_t lnk_id = 0;
#define STREAMS_LOPRI MINCLSYSPRI
static pri_t streams_lopri = STREAMS_LOPRI;
#define STRSTAT(x) (str_statistics.x.value.ui64++)
typedef struct str_stat {
kstat_named_t sqenables;
kstat_named_t stenables;
kstat_named_t syncqservice;
kstat_named_t freebs;
kstat_named_t qwr_outer;
kstat_named_t rservice;
kstat_named_t strwaits;
kstat_named_t taskqfails;
kstat_named_t bufcalls;
kstat_named_t qremoved;
kstat_named_t sqremoved;
kstat_named_t bcwaits;
kstat_named_t sqtoomany;
} str_stat_t;
static str_stat_t str_statistics = {
{ "sqenables", KSTAT_DATA_UINT64 },
{ "stenables", KSTAT_DATA_UINT64 },
{ "syncqservice", KSTAT_DATA_UINT64 },
{ "freebs", KSTAT_DATA_UINT64 },
{ "qwr_outer", KSTAT_DATA_UINT64 },
{ "rservice", KSTAT_DATA_UINT64 },
{ "strwaits", KSTAT_DATA_UINT64 },
{ "taskqfails", KSTAT_DATA_UINT64 },
{ "bufcalls", KSTAT_DATA_UINT64 },
{ "qremoved", KSTAT_DATA_UINT64 },
{ "sqremoved", KSTAT_DATA_UINT64 },
{ "bcwaits", KSTAT_DATA_UINT64 },
{ "sqtoomany", KSTAT_DATA_UINT64 },
};
static kstat_t *str_kstat;
char qrunflag;
static taskq_t *streams_taskq;
static kmutex_t service_queue;
static kcondvar_t services_to_run;
static kcondvar_t syncqs_to_run;
static struct queue *qhead;
static struct queue *qtail;
static syncq_t *sqhead;
static syncq_t *sqtail;
static mblk_t *freebs_list;
kthread_t *streams_qbkgrnd_thread;
kthread_t *streams_sqbkgrnd_thread;
struct bclist strbcalls;
kmutex_t strbcall_lock;
kcondvar_t strbcall_cv;
kmutex_t bcall_monitor;
kcondvar_t bcall_cv;
kthread_t *bc_bkgrnd_thread;
kmutex_t strresources;
kmutex_t muxifier;
static void *str_stack_init(netstackid_t stackid, netstack_t *ns);
static void str_stack_shutdown(netstackid_t stackid, void *arg);
static void str_stack_fini(netstackid_t stackid, void *arg);
int run_queues = 0;
int sq_max_size = 10000;
int n_ciputctrl;
int max_n_ciputctrl = 16;
int min_n_ciputctrl = 2;
perdm_t *perdm_list = NULL;
static krwlock_t perdm_rwlock;
cdevsw_impl_t *devimpl;
extern struct qinit strdata;
extern struct qinit stwdata;
static void runservice(queue_t *);
static void streams_bufcall_service(void);
static void streams_qbkgrnd_service(void);
static void streams_sqbkgrnd_service(void);
static syncq_t *new_syncq(void);
static void free_syncq(syncq_t *);
static void outer_insert(syncq_t *, syncq_t *);
static void outer_remove(syncq_t *, syncq_t *);
static void write_now(syncq_t *);
static void clr_qfull(queue_t *);
static void runbufcalls(void);
static void sqenable(syncq_t *);
static void sqfill_events(syncq_t *, queue_t *, mblk_t *, void (*)());
static void wait_q_syncq(queue_t *);
static void backenable_insertedq(queue_t *);
static void queue_service(queue_t *);
static void stream_service(stdata_t *);
static void syncq_service(syncq_t *);
static void qwriter_outer_service(syncq_t *);
static void mblk_free(mblk_t *);
#ifdef DEBUG
static int qprocsareon(queue_t *);
#endif
static void set_nfsrv_ptr(queue_t *, queue_t *, queue_t *, queue_t *);
static void reset_nfsrv_ptr(queue_t *, queue_t *);
void set_qfull(queue_t *);
static void sq_run_events(syncq_t *);
static int propagate_syncq(queue_t *);
static void blocksq(syncq_t *, ushort_t, int);
static void unblocksq(syncq_t *, ushort_t, int);
static int dropsq(syncq_t *, uint16_t);
static void emptysq(syncq_t *);
static sqlist_t *sqlist_alloc(struct stdata *, int);
static void sqlist_free(sqlist_t *);
static sqlist_t *sqlist_build(queue_t *, struct stdata *, boolean_t);
static void sqlist_insert(sqlist_t *, syncq_t *);
static void sqlist_insertall(sqlist_t *, queue_t *);
static void strsetuio(stdata_t *);
struct kmem_cache *stream_head_cache;
struct kmem_cache *queue_cache;
struct kmem_cache *syncq_cache;
struct kmem_cache *qband_cache;
struct kmem_cache *linkinfo_cache;
struct kmem_cache *ciputctrl_cache = NULL;
static linkinfo_t *linkinfo_list;
static esb_queue_t system_esbq;
static esb_queue_t *volatile system_esbq_array;
static int esbq_nelem;
static kmutex_t esbq_lock;
static int esbq_log2_cpus_per_q = 0;
uint_t esbq_cpus_per_q = 1;
int esbq_max_qlen = 0x16;
clock_t esbq_timeout = 0x8;
static void esballoc_process_queue(esb_queue_t *);
static void esballoc_enqueue_mblk(mblk_t *);
static void esballoc_timer(void *);
static void esballoc_set_timer(esb_queue_t *, clock_t);
static void esballoc_mblk_free(mblk_t *);
static int pass_rput(queue_t *, mblk_t *);
static int pass_wput(queue_t *, mblk_t *);
static queue_t *link_addpassthru(stdata_t *);
static void link_rempassthru(queue_t *);
struct module_info passthru_info = {
0,
"passthru",
0,
INFPSZ,
STRHIGH,
STRLOW
};
struct qinit passthru_rinit = {
pass_rput,
NULL,
NULL,
NULL,
NULL,
&passthru_info,
NULL
};
struct qinit passthru_winit = {
pass_wput,
NULL,
NULL,
NULL,
NULL,
&passthru_info,
NULL
};
#define LISTCHECK(head, tail, link) { \
EQUIV(head, tail); \
IMPLY(tail != NULL, tail->link == NULL); \
}
#define ENQUEUE(el, head, tail, link) { \
ASSERT(el->link == NULL); \
LISTCHECK(head, tail, link); \
if (head == NULL) \
head = el; \
else \
tail->link = el; \
tail = el; \
}
#define DQ(el, head, tail, link) { \
LISTCHECK(head, tail, link); \
el = head; \
if (head != NULL) { \
head = head->link; \
if (head == NULL) \
tail = NULL; \
el->link = NULL; \
} \
}
#define RMQ(el, head, tail, link, chase, curr, succeed) { \
LISTCHECK(head, tail, link); \
chase = NULL; \
succeed = 0; \
for (curr = head; (curr != el) && (curr != NULL); curr = curr->link) \
chase = curr; \
if (curr != NULL) { \
succeed = 1; \
ASSERT(curr == el); \
if (chase != NULL) \
chase->link = curr->link; \
else \
head = curr->link; \
curr->link = NULL; \
if (curr == tail) \
tail = chase; \
} \
LISTCHECK(head, tail, link); \
}
#define SQPUT_Q(sq, qp) \
{ \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
if (!(qp->q_sqflags & Q_SQQUEUED)) { \
\
ASSERT((qp->q_sqprev == NULL) && (qp->q_sqnext == NULL)); \
\
EQUIV(sq->sq_head, sq->sq_tail); \
\
ASSERT(sq == qp->q_syncq); \
\
EQUIV(sq->sq_head, (sq->sq_flags & SQ_MESSAGES)); \
\
IMPLY(sq->sq_head != NULL, sq->sq_head->q_sqprev == NULL);\
IMPLY(sq->sq_tail != NULL, sq->sq_tail->q_sqnext == NULL);\
\
IMPLY(sq->sq_head == NULL, sq->sq_pri == 0); \
\
EQUIV(sq->sq_head, sq->sq_nqueues); \
if (sq->sq_head == NULL) { \
sq->sq_head = sq->sq_tail = qp; \
sq->sq_flags |= SQ_MESSAGES; \
} else if (qp->q_spri == 0) { \
qp->q_sqprev = sq->sq_tail; \
sq->sq_tail->q_sqnext = qp; \
sq->sq_tail = qp; \
} else { \
\
queue_t **qpp = &sq->sq_tail; \
queue_t *qnext = NULL; \
\
while (*qpp != NULL && qp->q_spri > (*qpp)->q_spri) { \
qnext = *qpp; \
qpp = &(*qpp)->q_sqprev; \
} \
qp->q_sqnext = qnext; \
qp->q_sqprev = *qpp; \
if (*qpp != NULL) { \
(*qpp)->q_sqnext = qp; \
} else { \
sq->sq_head = qp; \
sq->sq_pri = sq->sq_head->q_spri; \
} \
*qpp = qp; \
} \
qp->q_sqflags |= Q_SQQUEUED; \
qp->q_sqtstamp = ddi_get_lbolt(); \
sq->sq_nqueues++; \
} \
}
#define SQRM_Q(sq, qp) \
{ \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
ASSERT(qp->q_sqflags & Q_SQQUEUED); \
ASSERT(sq->sq_head != NULL && sq->sq_tail != NULL); \
ASSERT((sq->sq_flags & SQ_MESSAGES) != 0); \
\
ASSERT(qp->q_sqnext != NULL || sq->sq_tail == qp); \
ASSERT(qp->q_sqprev != NULL || sq->sq_head == qp); \
ASSERT(sq->sq_nqueues != 0); \
if (qp->q_sqprev == NULL) { \
\
sq->sq_head = qp->q_sqnext; \
} else { \
\
qp->q_sqprev->q_sqnext = qp->q_sqnext; \
} \
if (qp->q_sqnext == NULL) { \
\
sq->sq_tail = qp->q_sqprev; \
} else { \
\
qp->q_sqnext->q_sqprev = qp->q_sqprev; \
} \
\
qp->q_sqprev = qp->q_sqnext = NULL; \
qp->q_sqflags &= ~Q_SQQUEUED; \
\
if (sq->sq_head != NULL) { \
sq->sq_pri = sq->sq_head->q_spri; \
} else { \
sq->sq_flags &= ~SQ_MESSAGES; \
sq->sq_pri = 0; \
} \
sq->sq_nqueues--; \
ASSERT(sq->sq_head != NULL || sq->sq_evhead != NULL || \
(sq->sq_flags & SQ_QUEUED) == 0); \
}
#ifdef SQPUT_MP
#undef SQPUT_MP
#endif
#define SQPUT_MP(qp, mp) \
{ \
ASSERT(MUTEX_HELD(QLOCK(qp))); \
ASSERT(qp->q_sqhead == NULL || \
(qp->q_sqtail != NULL && \
qp->q_sqtail->b_next == NULL)); \
qp->q_syncqmsgs++; \
ASSERT(qp->q_syncqmsgs != 0); \
if (qp->q_sqhead == NULL) { \
qp->q_sqhead = qp->q_sqtail = mp; \
} else { \
qp->q_sqtail->b_next = mp; \
qp->q_sqtail = mp; \
} \
ASSERT(qp->q_syncqmsgs > 0); \
set_qfull(qp); \
}
#define SQ_PUTCOUNT_SETFAST_LOCKED(sq) { \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
if ((sq)->sq_ciputctrl != NULL) { \
int i; \
int nlocks = (sq)->sq_nciputctrl; \
ciputctrl_t *cip = (sq)->sq_ciputctrl; \
ASSERT((sq)->sq_type & SQ_CIPUT); \
for (i = 0; i <= nlocks; i++) { \
ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
cip[i].ciputctrl_count |= SQ_FASTPUT; \
} \
} \
}
#define SQ_PUTCOUNT_CLRFAST_LOCKED(sq) { \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
if ((sq)->sq_ciputctrl != NULL) { \
int i; \
int nlocks = (sq)->sq_nciputctrl; \
ciputctrl_t *cip = (sq)->sq_ciputctrl; \
ASSERT((sq)->sq_type & SQ_CIPUT); \
for (i = 0; i <= nlocks; i++) { \
ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
cip[i].ciputctrl_count &= ~SQ_FASTPUT; \
} \
} \
}
#define STR_SERVICE(stp, q) { \
ASSERT(MUTEX_HELD(&stp->sd_qlock)); \
while (stp->sd_qhead != NULL) { \
DQ(q, stp->sd_qhead, stp->sd_qtail, q_link); \
ASSERT(stp->sd_nqueues > 0); \
stp->sd_nqueues--; \
ASSERT(!(q->q_flag & QINSERVICE)); \
mutex_exit(&stp->sd_qlock); \
queue_service(q); \
mutex_enter(&stp->sd_qlock); \
} \
ASSERT(stp->sd_nqueues == 0); \
ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL)); \
}
static int
stream_head_constructor(void *buf, void *cdrarg, int kmflags)
{
stdata_t *stp = buf;
mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
stp->sd_wrq = NULL;
return (0);
}
static void
stream_head_destructor(void *buf, void *cdrarg)
{
stdata_t *stp = buf;
mutex_destroy(&stp->sd_lock);
mutex_destroy(&stp->sd_reflock);
mutex_destroy(&stp->sd_qlock);
cv_destroy(&stp->sd_monitor);
cv_destroy(&stp->sd_iocmonitor);
cv_destroy(&stp->sd_refmonitor);
cv_destroy(&stp->sd_qcv);
cv_destroy(&stp->sd_zcopy_wait);
}
static int
queue_constructor(void *buf, void *cdrarg, int kmflags)
{
queinfo_t *qip = buf;
queue_t *qp = &qip->qu_rqueue;
queue_t *wqp = &qip->qu_wqueue;
syncq_t *sq = &qip->qu_syncq;
qp->q_first = NULL;
qp->q_link = NULL;
qp->q_count = 0;
qp->q_mblkcnt = 0;
qp->q_sqhead = NULL;
qp->q_sqtail = NULL;
qp->q_sqnext = NULL;
qp->q_sqprev = NULL;
qp->q_sqflags = 0;
qp->q_rwcnt = 0;
qp->q_spri = 0;
mutex_init(QLOCK(qp), NULL, MUTEX_DEFAULT, NULL);
cv_init(&qp->q_wait, NULL, CV_DEFAULT, NULL);
wqp->q_first = NULL;
wqp->q_link = NULL;
wqp->q_count = 0;
wqp->q_mblkcnt = 0;
wqp->q_sqhead = NULL;
wqp->q_sqtail = NULL;
wqp->q_sqnext = NULL;
wqp->q_sqprev = NULL;
wqp->q_sqflags = 0;
wqp->q_rwcnt = 0;
wqp->q_spri = 0;
mutex_init(QLOCK(wqp), NULL, MUTEX_DEFAULT, NULL);
cv_init(&wqp->q_wait, NULL, CV_DEFAULT, NULL);
sq->sq_head = NULL;
sq->sq_tail = NULL;
sq->sq_evhead = NULL;
sq->sq_evtail = NULL;
sq->sq_callbpend = NULL;
sq->sq_outer = NULL;
sq->sq_onext = NULL;
sq->sq_oprev = NULL;
sq->sq_next = NULL;
sq->sq_svcflags = 0;
sq->sq_servcount = 0;
sq->sq_needexcl = 0;
sq->sq_nqueues = 0;
sq->sq_pri = 0;
mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
return (0);
}
static void
queue_destructor(void *buf, void *cdrarg)
{
queinfo_t *qip = buf;
queue_t *qp = &qip->qu_rqueue;
queue_t *wqp = &qip->qu_wqueue;
syncq_t *sq = &qip->qu_syncq;
ASSERT(qp->q_sqhead == NULL);
ASSERT(wqp->q_sqhead == NULL);
ASSERT(qp->q_sqnext == NULL);
ASSERT(wqp->q_sqnext == NULL);
ASSERT(qp->q_rwcnt == 0);
ASSERT(wqp->q_rwcnt == 0);
mutex_destroy(&qp->q_lock);
cv_destroy(&qp->q_wait);
mutex_destroy(&wqp->q_lock);
cv_destroy(&wqp->q_wait);
mutex_destroy(&sq->sq_lock);
cv_destroy(&sq->sq_wait);
cv_destroy(&sq->sq_exitwait);
}
static int
syncq_constructor(void *buf, void *cdrarg, int kmflags)
{
syncq_t *sq = buf;
bzero(buf, sizeof (syncq_t));
mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
return (0);
}
static void
syncq_destructor(void *buf, void *cdrarg)
{
syncq_t *sq = buf;
ASSERT(sq->sq_head == NULL);
ASSERT(sq->sq_tail == NULL);
ASSERT(sq->sq_evhead == NULL);
ASSERT(sq->sq_evtail == NULL);
ASSERT(sq->sq_callbpend == NULL);
ASSERT(sq->sq_callbflags == 0);
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL);
ASSERT(sq->sq_oprev == NULL);
ASSERT(sq->sq_next == NULL);
ASSERT(sq->sq_needexcl == 0);
ASSERT(sq->sq_svcflags == 0);
ASSERT(sq->sq_servcount == 0);
ASSERT(sq->sq_nqueues == 0);
ASSERT(sq->sq_pri == 0);
ASSERT(sq->sq_count == 0);
ASSERT(sq->sq_rmqcount == 0);
ASSERT(sq->sq_cancelid == 0);
ASSERT(sq->sq_ciputctrl == NULL);
ASSERT(sq->sq_nciputctrl == 0);
ASSERT(sq->sq_type == 0);
ASSERT(sq->sq_flags == 0);
mutex_destroy(&sq->sq_lock);
cv_destroy(&sq->sq_wait);
cv_destroy(&sq->sq_exitwait);
}
static int
ciputctrl_constructor(void *buf, void *cdrarg, int kmflags)
{
ciputctrl_t *cip = buf;
int i;
for (i = 0; i < n_ciputctrl; i++) {
cip[i].ciputctrl_count = SQ_FASTPUT;
mutex_init(&cip[i].ciputctrl_lock, NULL, MUTEX_DEFAULT, NULL);
}
return (0);
}
static void
ciputctrl_destructor(void *buf, void *cdrarg)
{
ciputctrl_t *cip = buf;
int i;
for (i = 0; i < n_ciputctrl; i++) {
ASSERT(cip[i].ciputctrl_count & SQ_FASTPUT);
mutex_destroy(&cip[i].ciputctrl_lock);
}
}
void
strinit(void)
{
int ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
stream_head_cache = kmem_cache_create("stream_head_cache",
sizeof (stdata_t), 0,
stream_head_constructor, stream_head_destructor, NULL,
NULL, NULL, 0);
queue_cache = kmem_cache_create("queue_cache", sizeof (queinfo_t), 0,
queue_constructor, queue_destructor, NULL, NULL, NULL, 0);
syncq_cache = kmem_cache_create("syncq_cache", sizeof (syncq_t), 0,
syncq_constructor, syncq_destructor, NULL, NULL, NULL, 0);
qband_cache = kmem_cache_create("qband_cache",
sizeof (qband_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
linkinfo_cache = kmem_cache_create("linkinfo_cache",
sizeof (linkinfo_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
n_ciputctrl = ncpus;
n_ciputctrl = 1 << highbit(n_ciputctrl - 1);
ASSERT(n_ciputctrl >= 1);
n_ciputctrl = MIN(n_ciputctrl, max_n_ciputctrl);
if (n_ciputctrl >= min_n_ciputctrl) {
ciputctrl_cache = kmem_cache_create("ciputctrl_cache",
sizeof (ciputctrl_t) * n_ciputctrl,
sizeof (ciputctrl_t), ciputctrl_constructor,
ciputctrl_destructor, NULL, NULL, NULL, 0);
}
streams_taskq = system_taskq;
if (streams_taskq == NULL)
panic("strinit: no memory for streams taskq!");
bc_bkgrnd_thread = thread_create(NULL, 0,
streams_bufcall_service, NULL, 0, &p0, TS_RUN, streams_lopri);
streams_qbkgrnd_thread = thread_create(NULL, 0,
streams_qbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
streams_sqbkgrnd_thread = thread_create(NULL, 0,
streams_sqbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
str_kstat = kstat_create("streams", 0, "strstat",
"net", KSTAT_TYPE_NAMED,
sizeof (str_statistics) / sizeof (kstat_named_t),
KSTAT_FLAG_VIRTUAL);
if (str_kstat != NULL) {
str_kstat->ks_data = &str_statistics;
kstat_install(str_kstat);
}
tpi_init();
netstack_register(NS_STR, str_stack_init, str_stack_shutdown,
str_stack_fini);
}
void
str_sendsig(vnode_t *vp, int event, uchar_t band, int error)
{
struct stdata *stp;
ASSERT(vp->v_stream);
stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & event)
strsendsig(stp->sd_siglist, event, band, error);
mutex_exit(&stp->sd_lock);
}
static void
dosendsig(proc_t *proc, int events, int sevent, k_siginfo_t *info,
uchar_t band, int error)
{
ASSERT(MUTEX_HELD(&proc->p_lock));
info->si_band = 0;
info->si_errno = 0;
if (sevent & S_ERROR) {
sevent &= ~S_ERROR;
info->si_code = POLL_ERR;
info->si_errno = error;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_errno = 0;
}
if (sevent & S_HANGUP) {
sevent &= ~S_HANGUP;
info->si_code = POLL_HUP;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
}
if (sevent & S_HIPRI) {
sevent &= ~S_HIPRI;
info->si_code = POLL_PRI;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
}
if (sevent & S_RDBAND) {
sevent &= ~S_RDBAND;
if (events & S_BANDURG)
sigtoproc(proc, NULL, SIGURG);
else
sigtoproc(proc, NULL, SIGPOLL);
}
if (sevent & S_WRBAND) {
sevent &= ~S_WRBAND;
sigtoproc(proc, NULL, SIGPOLL);
}
if (sevent & S_INPUT) {
sevent &= ~S_INPUT;
info->si_code = POLL_IN;
info->si_band = band;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_band = 0;
}
if (sevent & S_OUTPUT) {
sevent &= ~S_OUTPUT;
info->si_code = POLL_OUT;
info->si_band = band;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_band = 0;
}
if (sevent & S_MSG) {
sevent &= ~S_MSG;
info->si_code = POLL_MSG;
info->si_band = band;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_band = 0;
}
if (sevent & S_RDNORM) {
sevent &= ~S_RDNORM;
sigtoproc(proc, NULL, SIGPOLL);
}
if (sevent != 0) {
panic("strsendsig: unknown event(s) %x", sevent);
}
}
void
strsendsig(strsig_t *siglist, int event, uchar_t band, int error)
{
strsig_t *ssp;
k_siginfo_t info;
struct pid *pidp;
proc_t *proc;
info.si_signo = SIGPOLL;
info.si_errno = 0;
for (ssp = siglist; ssp; ssp = ssp->ss_next) {
int sevent;
sevent = ssp->ss_events & event;
if (sevent == 0)
continue;
if ((pidp = ssp->ss_pidp) == NULL) {
continue;
}
if (ssp->ss_pid > 0) {
ASSERT(ssp->ss_pid == pidp->pid_id);
mutex_enter(&pidlock);
proc = prfind_zone(pidp->pid_id, ALL_ZONES);
if (proc == NULL) {
mutex_exit(&pidlock);
continue;
}
mutex_enter(&proc->p_lock);
mutex_exit(&pidlock);
dosendsig(proc, ssp->ss_events, sevent, &info,
band, error);
mutex_exit(&proc->p_lock);
} else {
pid_t pgrp = -ssp->ss_pid;
mutex_enter(&pidlock);
proc = pgfind_zone(pgrp, ALL_ZONES);
while (proc != NULL) {
mutex_enter(&proc->p_lock);
dosendsig(proc, ssp->ss_events, sevent,
&info, band, error);
mutex_exit(&proc->p_lock);
proc = proc->p_pglink;
}
mutex_exit(&pidlock);
}
}
}
int
qattach(queue_t *qp, dev_t *devp, int oflag, cred_t *crp, fmodsw_impl_t *fp,
boolean_t is_insert)
{
major_t major;
cdevsw_impl_t *dp;
struct streamtab *str;
queue_t *rq;
queue_t *wrq;
uint32_t qflag;
uint32_t sqtype;
perdm_t *dmp;
int error;
int sflag;
rq = allocq();
wrq = _WR(rq);
STREAM(rq) = STREAM(wrq) = STREAM(qp);
if (fp != NULL) {
str = fp->f_str;
qflag = fp->f_qflag;
sqtype = fp->f_sqtype;
dmp = fp->f_dmp;
IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
sflag = MODOPEN;
rq->q_fp = fp;
} else {
ASSERT(!is_insert);
major = getmajor(*devp);
dp = &devimpl[major];
str = dp->d_str;
ASSERT(str == STREAMSTAB(major));
qflag = dp->d_qflag;
ASSERT(qflag & QISDRV);
sqtype = dp->d_sqtype;
if (NEED_DM(dp->d_dmp, qflag))
dp->d_dmp = hold_dm(str, qflag, sqtype);
dmp = dp->d_dmp;
sflag = 0;
}
TRACE_2(TR_FAC_STREAMS_FR, TR_QATTACH_FLAGS,
"qattach:qflag == %X(%X)", qflag, *devp);
setq(rq, str->st_rdinit, str->st_wrinit, dmp, qflag, sqtype, B_FALSE);
if (is_insert) {
rq->q_flag |= _QINSERTING;
rq->q_next = qp;
}
entersq(rq->q_syncq, SQ_OPENCLOSE);
error = (*rq->q_qinfo->qi_qopen)(rq, devp, oflag, sflag, crp);
if (error != 0)
goto failed;
leavesq(rq->q_syncq, SQ_OPENCLOSE);
ASSERT(qprocsareon(rq));
return (0);
failed:
rq->q_flag &= ~_QINSERTING;
if (backq(wrq) != NULL && backq(wrq)->q_next == wrq)
qprocsoff(rq);
leavesq(rq->q_syncq, SQ_OPENCLOSE);
rq->q_next = wrq->q_next = NULL;
qdetach(rq, 0, 0, crp, B_FALSE);
return (error);
}
int
qreopen(queue_t *qp, dev_t *devp, int flag, cred_t *crp)
{
int error;
dev_t dummydev;
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
entersq(qp->q_syncq, SQ_OPENCLOSE);
dummydev = *devp;
if (error = ((*qp->q_qinfo->qi_qopen)(qp, &dummydev,
(wqp->q_next ? 0 : flag), (wqp->q_next ? MODOPEN : 0), crp))) {
leavesq(qp->q_syncq, SQ_OPENCLOSE);
mutex_enter(&STREAM(qp)->sd_lock);
qp->q_stream->sd_flag |= STREOPENFAIL;
mutex_exit(&STREAM(qp)->sd_lock);
return (error);
}
leavesq(qp->q_syncq, SQ_OPENCLOSE);
ASSERT(qprocsareon(_RD(qp)));
return (0);
}
void
qdetach(queue_t *qp, int clmode, int flag, cred_t *crp, boolean_t is_remove)
{
queue_t *wqp = _WR(qp);
ASSERT(STREAM(qp)->sd_flag & (STRCLOSE|STWOPEN|STRPLUMB));
if (STREAM_NEEDSERVICE(STREAM(qp)))
stream_runservice(STREAM(qp));
if (clmode) {
wait_q_syncq(wqp);
entersq(qp->q_syncq, SQ_OPENCLOSE);
if (is_remove) {
mutex_enter(QLOCK(qp));
qp->q_flag |= _QREMOVING;
mutex_exit(QLOCK(qp));
}
(*qp->q_qinfo->qi_qclose)(qp, flag, crp);
ASSERT((qp->q_flag & QWCLOSE) && (wqp->q_flag & QWCLOSE));
leavesq(qp->q_syncq, SQ_OPENCLOSE);
} else {
disable_svc(qp);
}
ASSERT(flush_syncq(qp->q_syncq, qp) == 0);
ASSERT(flush_syncq(wqp->q_syncq, wqp) == 0);
ASSERT((qp->q_flag & QPERMOD) ||
((qp->q_syncq->sq_head == NULL) &&
(wqp->q_syncq->sq_head == NULL)));
ASSERT(qp->q_fp != NULL || qp->q_flag & QISDRV);
if (qp->q_fp != NULL)
fmodsw_rele(qp->q_fp);
freeq(qp);
}
void
disable_svc(queue_t *qp)
{
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
mutex_enter(QLOCK(qp));
qp->q_flag |= QWCLOSE;
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
wqp->q_flag |= QWCLOSE;
mutex_exit(QLOCK(wqp));
}
void
enable_svc(queue_t *qp)
{
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
mutex_enter(QLOCK(qp));
qp->q_flag &= ~QWCLOSE;
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
wqp->q_flag &= ~QWCLOSE;
mutex_exit(QLOCK(wqp));
}
void
remove_runlist(queue_t *qp)
{
if (qp->q_flag & QENAB && qhead != NULL) {
queue_t *q_chase;
queue_t *q_curr;
int removed;
mutex_enter(&service_queue);
RMQ(qp, qhead, qtail, q_link, q_chase, q_curr, removed);
mutex_exit(&service_queue);
if (removed) {
STRSTAT(qremoved);
qp->q_flag &= ~QENAB;
}
}
}
void
wait_svc(queue_t *qp)
{
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
if (qhead != NULL) {
remove_runlist(qp);
remove_runlist(wqp);
}
if (!(qp->q_flag & QPERMOD)) {
syncq_t *rsq = qp->q_syncq;
syncq_t *wsq = wqp->q_syncq;
wait_sq_svc(rsq);
if (wsq != rsq)
wait_sq_svc(wsq);
}
mutex_enter(QLOCK(qp));
while (qp->q_flag & (QINSERVICE|QENAB))
cv_wait(&qp->q_wait, QLOCK(qp));
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
while (wqp->q_flag & (QINSERVICE|QENAB))
cv_wait(&wqp->q_wait, QLOCK(wqp));
mutex_exit(QLOCK(wqp));
}
int
putiocd(mblk_t *bp, char *arg, int flag, cred_t *cr)
{
mblk_t *tmp;
ssize_t count;
int error = 0;
ASSERT((flag & (U_TO_K | K_TO_K)) == U_TO_K ||
(flag & (U_TO_K | K_TO_K)) == K_TO_K);
if (bp->b_datap->db_type == M_IOCTL) {
count = ((struct iocblk *)bp->b_rptr)->ioc_count;
} else {
ASSERT(bp->b_datap->db_type == M_COPYIN);
count = ((struct copyreq *)bp->b_rptr)->cq_size;
}
ASSERT(count >= 0);
if ((tmp = allocb_cred_wait(count, (flag & STR_NOSIG), &error, cr,
curproc->p_pid)) == NULL) {
return (error);
}
error = strcopyin(arg, tmp->b_wptr, count, flag & (U_TO_K|K_TO_K));
if (error != 0) {
freeb(tmp);
return (error);
}
DB_CPID(tmp) = curproc->p_pid;
tmp->b_wptr += count;
bp->b_cont = tmp;
return (0);
}
int
getiocd(mblk_t *bp, char *arg, int copymode)
{
ssize_t count;
size_t n;
int error;
if (bp->b_datap->db_type == M_IOCACK)
count = ((struct iocblk *)bp->b_rptr)->ioc_count;
else {
ASSERT(bp->b_datap->db_type == M_COPYOUT);
count = ((struct copyreq *)bp->b_rptr)->cq_size;
}
ASSERT(count >= 0);
for (bp = bp->b_cont; bp && count;
count -= n, bp = bp->b_cont, arg += n) {
n = MIN(count, bp->b_wptr - bp->b_rptr);
error = strcopyout(bp->b_rptr, arg, n, copymode);
if (error)
return (error);
}
ASSERT(count == 0);
return (0);
}
linkinfo_t *
alloclink(queue_t *qup, queue_t *qdown, file_t *fpdown)
{
linkinfo_t *linkp;
linkp = kmem_cache_alloc(linkinfo_cache, KM_SLEEP);
linkp->li_lblk.l_qtop = qup;
linkp->li_lblk.l_qbot = qdown;
linkp->li_fpdown = fpdown;
mutex_enter(&strresources);
linkp->li_next = linkinfo_list;
linkp->li_prev = NULL;
if (linkp->li_next)
linkp->li_next->li_prev = linkp;
linkinfo_list = linkp;
linkp->li_lblk.l_index = ++lnk_id;
ASSERT(lnk_id != 0);
mutex_exit(&strresources);
return (linkp);
}
void
lbfree(linkinfo_t *linkp)
{
mutex_enter(&strresources);
if (linkp->li_next)
linkp->li_next->li_prev = linkp->li_prev;
if (linkp->li_prev)
linkp->li_prev->li_next = linkp->li_next;
else
linkinfo_list = linkp->li_next;
mutex_exit(&strresources);
kmem_cache_free(linkinfo_cache, linkp);
}
int
linkcycle(stdata_t *upstp, stdata_t *lostp, str_stack_t *ss)
{
struct mux_node *np;
struct mux_edge *ep;
int i;
major_t lomaj;
major_t upmaj;
if (lostp->sd_vnode->v_type == VFIFO)
return (0);
for (i = 0; i < ss->ss_devcnt; i++) {
np = &ss->ss_mux_nodes[i];
MUX_CLEAR(np);
}
lomaj = getmajor(lostp->sd_vnode->v_rdev);
upmaj = getmajor(upstp->sd_vnode->v_rdev);
np = &ss->ss_mux_nodes[lomaj];
for (;;) {
if (!MUX_DIDVISIT(np)) {
if (np->mn_imaj == upmaj)
return (1);
if (np->mn_outp == NULL) {
MUX_VISIT(np);
if (np->mn_originp == NULL)
return (0);
np = np->mn_originp;
continue;
}
MUX_VISIT(np);
np->mn_startp = np->mn_outp;
} else {
if (np->mn_startp == NULL) {
if (np->mn_originp == NULL)
return (0);
else {
np = np->mn_originp;
continue;
}
}
ep = np->mn_startp;
np->mn_startp = ep->me_nextp;
if (ep->me_nodep == NULL)
continue;
ep->me_nodep->mn_originp = np;
np = ep->me_nodep;
}
}
}
linkinfo_t *
findlinks(stdata_t *stp, int index, int type, str_stack_t *ss)
{
linkinfo_t *linkp;
struct mux_edge *mep;
struct mux_node *mnp;
queue_t *qup;
mutex_enter(&strresources);
if ((type & LINKTYPEMASK) == LINKNORMAL) {
qup = getendq(stp->sd_wrq);
for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
if ((qup == linkp->li_lblk.l_qtop) &&
(!index || (index == linkp->li_lblk.l_index))) {
mutex_exit(&strresources);
return (linkp);
}
}
} else {
ASSERT((type & LINKTYPEMASK) == LINKPERSIST);
mnp = &ss->ss_mux_nodes[getmajor(stp->sd_vnode->v_rdev)];
mep = mnp->mn_outp;
while (mep) {
if ((index == 0) || (index == mep->me_muxid))
break;
mep = mep->me_nextp;
}
if (!mep) {
mutex_exit(&strresources);
return (NULL);
}
for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
if ((!linkp->li_lblk.l_qtop) &&
(mep->me_muxid == linkp->li_lblk.l_index)) {
mutex_exit(&strresources);
return (linkp);
}
}
}
mutex_exit(&strresources);
return (NULL);
}
queue_t *
getendq(queue_t *q)
{
ASSERT(q != NULL);
while (_SAMESTR(q))
q = q->q_next;
return (q);
}
static void
wait_syncq(syncq_t *sq)
{
uint16_t count;
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
while (count != 0) {
sq->sq_flags |= SQ_WANTWAKEUP;
SQ_PUTLOCKS_EXIT(sq);
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
}
static void
wait_q_syncq(queue_t *q)
{
if ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
syncq_t *sq = q->q_syncq;
mutex_enter(SQLOCK(sq));
while ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
mutex_exit(SQLOCK(sq));
}
}
int
mlink_file(vnode_t *vp, int cmd, struct file *fpdown, cred_t *crp, int *rvalp,
int lhlink)
{
struct stdata *stp;
struct strioctl strioc;
struct linkinfo *linkp;
struct stdata *stpdown;
struct streamtab *str;
queue_t *passq;
syncq_t *passyncq;
queue_t *rq;
cdevsw_impl_t *dp;
uint32_t qflag;
uint32_t sqtype;
perdm_t *dmp;
int error = 0;
netstack_t *ns;
str_stack_t *ss;
stp = vp->v_stream;
TRACE_1(TR_FAC_STREAMS_FR,
TR_I_LINK, "I_LINK/I_PLINK:stp %p", stp);
if (stp->sd_flag & STRHUP) {
return (ENXIO);
}
if (vp->v_type == VFIFO) {
return (EINVAL);
}
if (stp->sd_strtab == NULL) {
return (EINVAL);
}
if (!stp->sd_strtab->st_muxwinit) {
return (EINVAL);
}
if (fpdown == NULL) {
return (EBADF);
}
ns = netstack_find_by_cred(crp);
ASSERT(ns != NULL);
ss = ns->netstack_str;
ASSERT(ss != NULL);
if (getmajor(stp->sd_vnode->v_rdev) >= ss->ss_devcnt) {
netstack_rele(ss->ss_netstack);
return (EINVAL);
}
mutex_enter(&muxifier);
if (stp->sd_flag & STPLEX) {
mutex_exit(&muxifier);
netstack_rele(ss->ss_netstack);
return (ENXIO);
}
if (((stpdown = fpdown->f_vnode->v_stream) == NULL) ||
(stpdown == stp) || (stpdown->sd_flag &
(STPLEX|STRHUP|STRDERR|STWRERR|IOCWAIT|STRPLUMB)) ||
((stpdown->sd_vnode->v_type != VFIFO) &&
(getmajor(stpdown->sd_vnode->v_rdev) >= ss->ss_devcnt)) ||
linkcycle(stp, stpdown, ss)) {
mutex_exit(&muxifier);
netstack_rele(ss->ss_netstack);
return (EINVAL);
}
TRACE_1(TR_FAC_STREAMS_FR,
TR_STPDOWN, "stpdown:%p", stpdown);
rq = getendq(stp->sd_wrq);
if (cmd == I_PLINK)
rq = NULL;
linkp = alloclink(rq, stpdown->sd_wrq, fpdown);
strioc.ic_cmd = cmd;
strioc.ic_timout = INFTIM;
strioc.ic_len = sizeof (struct linkblk);
strioc.ic_dp = (char *)&linkp->li_lblk;
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag |= STRPLUMB;
mutex_exit(&stpdown->sd_lock);
passq = link_addpassthru(stpdown);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag |= STPLEX;
mutex_exit(&stpdown->sd_lock);
rq = _RD(stpdown->sd_wrq);
wait_sq_svc(rq->q_syncq);
passyncq = passq->q_syncq;
if (!(passyncq->sq_flags & SQ_BLOCKED))
blocksq(passyncq, SQ_BLOCKED, 0);
ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
rq->q_ptr = _WR(rq)->q_ptr = NULL;
str = stp->sd_strtab;
dp = &devimpl[getmajor(vp->v_rdev)];
ASSERT(dp->d_str == str);
qflag = dp->d_qflag;
sqtype = dp->d_sqtype;
if (NEED_DM(dp->d_dmp, qflag))
dp->d_dmp = hold_dm(str, qflag, sqtype);
dmp = dp->d_dmp;
setq(rq, str->st_muxrinit, str->st_muxwinit, dmp, qflag, sqtype,
B_TRUE);
error = strdoioctl(stp, &strioc, FNATIVE,
K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
if (error != 0)
goto cleanup;
mutex_enter(&fpdown->f_tlock);
fpdown->f_count++;
mutex_exit(&fpdown->f_tlock);
ASSERT((cmd == I_LINK) || (cmd == I_PLINK));
if (cmd == I_LINK) {
error = ldi_mlink_fp(stp, fpdown, lhlink, LINKNORMAL);
} else {
error = ldi_mlink_fp(stp, fpdown, lhlink, LINKPERSIST);
}
if (error != 0) {
mutex_enter(&fpdown->f_tlock);
fpdown->f_count--;
mutex_exit(&fpdown->f_tlock);
goto cleanup;
}
link_rempassthru(passq);
mux_addedge(stp, stpdown, linkp->li_lblk.l_index, ss);
if (cmd == I_LINK) {
mutex_enter(&stp->sd_lock);
stp->sd_flag |= STRHASLINKS;
mutex_exit(&stp->sd_lock);
}
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&rq->q_wait);
cv_broadcast(&_WR(rq)->q_wait);
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
mutex_exit(&muxifier);
*rvalp = linkp->li_lblk.l_index;
netstack_rele(ss->ss_netstack);
return (0);
cleanup:
lbfree(linkp);
if (!(passyncq->sq_flags & SQ_BLOCKED))
blocksq(passyncq, SQ_BLOCKED, 0);
rq->q_ptr = _WR(rq)->q_ptr = stpdown;
setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO,
B_TRUE);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STPLEX;
mutex_exit(&stpdown->sd_lock);
link_rempassthru(passq);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
mutex_exit(&muxifier);
netstack_rele(ss->ss_netstack);
return (error);
}
int
mlink(vnode_t *vp, int cmd, int arg, cred_t *crp, int *rvalp, int lhlink)
{
int ret;
struct file *fpdown;
fpdown = getf(arg);
ret = mlink_file(vp, cmd, fpdown, crp, rvalp, lhlink);
if (fpdown != NULL)
releasef(arg);
return (ret);
}
int
munlink(stdata_t *stp, linkinfo_t *linkp, int flag, cred_t *crp, int *rvalp,
str_stack_t *ss)
{
struct strioctl strioc;
struct stdata *stpdown;
queue_t *rq, *wrq;
queue_t *passq;
syncq_t *passyncq;
int error = 0;
file_t *fpdown;
ASSERT(MUTEX_HELD(&muxifier));
stpdown = linkp->li_fpdown->f_vnode->v_stream;
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag |= STRPLUMB;
mutex_exit(&stpdown->sd_lock);
passq = link_addpassthru(stpdown);
if ((flag & LINKTYPEMASK) == LINKNORMAL)
strioc.ic_cmd = I_UNLINK;
else
strioc.ic_cmd = I_PUNLINK;
strioc.ic_timout = INFTIM;
strioc.ic_len = sizeof (struct linkblk);
strioc.ic_dp = (char *)&linkp->li_lblk;
error = strdoioctl(stp, &strioc, FNATIVE,
K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
if (error) {
if (flag & LINKCLOSE) {
cmn_err(CE_WARN, "KERNEL: munlink: could not perform "
"unlink ioctl, closing anyway (%d)\n", error);
} else {
link_rempassthru(passq);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
mutex_exit(&muxifier);
return (error);
}
}
mux_rmvedge(stp, linkp->li_lblk.l_index, ss);
fpdown = linkp->li_fpdown;
lbfree(linkp);
mutex_exit(&muxifier);
wrq = stpdown->sd_wrq;
rq = _RD(wrq);
disable_svc(rq);
wait_svc(rq);
if (wrq->q_flag & QPERMOD) {
syncq_t *sq = wrq->q_syncq;
mutex_enter(SQLOCK(sq));
while (wrq->q_sqflags & Q_SQQUEUED) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
mutex_exit(SQLOCK(sq));
}
passyncq = passq->q_syncq;
if (!(passyncq->sq_flags & SQ_BLOCKED)) {
syncq_t *sq, *outer;
blocksq(passyncq, SQ_BLOCKED, 0);
sq = rq->q_syncq;
if ((outer = sq->sq_outer) != NULL) {
wait_syncq(outer);
}
wait_syncq(sq);
ASSERT((rq->q_flag & QPERMOD) ||
((rq->q_syncq->sq_head == NULL) &&
(_WR(rq)->q_syncq->sq_head == NULL)));
}
if (rq->q_flag & QPERMOD) {
syncq_t *sq = rq->q_syncq;
mutex_enter(SQLOCK(sq));
while (rq->q_sqflags & Q_SQQUEUED) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
mutex_exit(SQLOCK(sq));
}
ASSERT(flush_syncq(rq->q_syncq, rq) == 0);
ASSERT(flush_syncq(wrq->q_syncq, wrq) == 0);
flushq(rq, FLUSHALL);
flushq(_WR(rq), FLUSHALL);
rq->q_ptr = wrq->q_ptr = stpdown;
setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, B_TRUE);
ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
enable_svc(rq);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STPLEX;
mutex_exit(&stpdown->sd_lock);
ASSERT(((flag & LINKTYPEMASK) == LINKNORMAL) ||
((flag & LINKTYPEMASK) == LINKPERSIST));
if ((flag & LINKTYPEMASK) == LINKNORMAL) {
VERIFY0(ldi_munlink_fp(stp, fpdown, LINKNORMAL));
} else {
VERIFY0(ldi_munlink_fp(stp, fpdown, LINKPERSIST));
}
link_rempassthru(passq);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
(void) closef(fpdown);
return (0);
}
int
munlinkall(stdata_t *stp, int flag, cred_t *crp, int *rvalp, str_stack_t *ss)
{
linkinfo_t *linkp;
int error = 0;
mutex_enter(&muxifier);
while (linkp = findlinks(stp, 0, flag, ss)) {
if (error = munlink(stp, linkp, flag, crp, rvalp, ss))
return (error);
mutex_enter(&muxifier);
}
mutex_exit(&muxifier);
return (0);
}
void
mux_addedge(stdata_t *upstp, stdata_t *lostp, int muxid, str_stack_t *ss)
{
struct mux_node *np;
struct mux_edge *ep;
major_t upmaj;
major_t lomaj;
upmaj = getmajor(upstp->sd_vnode->v_rdev);
lomaj = getmajor(lostp->sd_vnode->v_rdev);
np = &ss->ss_mux_nodes[upmaj];
if (np->mn_outp) {
ep = np->mn_outp;
while (ep->me_nextp)
ep = ep->me_nextp;
ep->me_nextp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
ep = ep->me_nextp;
} else {
np->mn_outp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
ep = np->mn_outp;
}
ep->me_nextp = NULL;
ep->me_muxid = muxid;
ep->me_dev = upstp->sd_vnode->v_rdev;
if (lostp->sd_vnode->v_type == VFIFO)
ep->me_nodep = NULL;
else
ep->me_nodep = &ss->ss_mux_nodes[lomaj];
}
void
mux_rmvedge(stdata_t *upstp, int muxid, str_stack_t *ss)
{
struct mux_node *np;
struct mux_edge *ep;
struct mux_edge *pep = NULL;
major_t upmaj;
upmaj = getmajor(upstp->sd_vnode->v_rdev);
np = &ss->ss_mux_nodes[upmaj];
ASSERT(np->mn_outp != NULL);
ep = np->mn_outp;
while (ep) {
if (ep->me_muxid == muxid) {
if (pep)
pep->me_nextp = ep->me_nextp;
else
np->mn_outp = ep->me_nextp;
kmem_free(ep, sizeof (struct mux_edge));
return;
}
pep = ep;
ep = ep->me_nextp;
}
ASSERT(0);
}
int
devflg_to_qflag(struct streamtab *stp, uint32_t devflag, uint32_t *qflagp,
uint32_t *sqtypep)
{
uint32_t qflag = 0;
uint32_t sqtype = 0;
if (devflag & _D_OLD)
goto bad;
switch (devflag & D_MTINNER_MASK) {
case D_MP:
qflag |= QMTSAFE;
sqtype |= SQ_CI;
break;
case D_MTPERQ|D_MP:
qflag |= QPERQ;
break;
case D_MTQPAIR|D_MP:
qflag |= QPAIR;
break;
case D_MTPERMOD|D_MP:
qflag |= QPERMOD;
break;
default:
goto bad;
}
if (devflag & D_MTOUTPERIM) {
switch (devflag & D_MTINNER_MASK) {
case D_MP:
case D_MTPERQ|D_MP:
case D_MTQPAIR|D_MP:
break;
default:
goto bad;
}
qflag |= QMTOUTPERIM;
}
if (devflag & D_MTINNER_MOD) {
switch (devflag & D_MTINNER_MASK) {
case D_MP:
goto bad;
default:
break;
}
if (devflag & D_MTPUTSHARED)
sqtype |= SQ_CIPUT;
if (devflag & _D_MTOCSHARED) {
if (!(devflag & D_MTPUTSHARED))
goto bad;
sqtype |= SQ_CIOC;
}
if (devflag & _D_MTCBSHARED) {
if (!(devflag & D_MTPUTSHARED))
goto bad;
sqtype |= SQ_CICB;
}
if (devflag & _D_MTSVCSHARED) {
if (!(devflag & D_MTPUTSHARED) || !(qflag & QPERMOD))
goto bad;
sqtype |= SQ_CISVC;
}
}
sqtype |= SQ_CO;
if (devflag & D_MTOCEXCL) {
if (!(devflag & D_MTOUTPERIM)) {
goto bad;
}
sqtype &= ~SQ_COOC;
}
if (devflag & D_SYNCSTR)
qflag |= QSYNCSTR;
if (devflag & _D_DIRECT) {
if ((qflag & QMT_TYPEMASK) != QMTSAFE)
goto bad;
qflag |= _QDIRECT;
}
if (devflag & _D_SINGLE_INSTANCE)
qflag |= _QSINGLE_INSTANCE;
*qflagp = qflag;
*sqtypep = sqtype;
return (0);
bad:
cmn_err(CE_WARN,
"stropen: bad MT flags (0x%x) in driver '%s'",
(int)(qflag & D_MTSAFETY_MASK),
stp->st_rdinit->qi_minfo->mi_idname);
return (EINVAL);
}
void
setq(queue_t *rq, struct qinit *rinit, struct qinit *winit,
perdm_t *dmp, uint32_t qflag, uint32_t sqtype, boolean_t lock_needed)
{
queue_t *wq;
syncq_t *sq, *outer;
ASSERT(rq->q_flag & QREADR);
ASSERT((qflag & QMT_TYPEMASK) != 0);
IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
wq = _WR(rq);
rq->q_qinfo = rinit;
rq->q_hiwat = rinit->qi_minfo->mi_hiwat;
rq->q_lowat = rinit->qi_minfo->mi_lowat;
rq->q_minpsz = rinit->qi_minfo->mi_minpsz;
rq->q_maxpsz = rinit->qi_minfo->mi_maxpsz;
wq->q_qinfo = winit;
wq->q_hiwat = winit->qi_minfo->mi_hiwat;
wq->q_lowat = winit->qi_minfo->mi_lowat;
wq->q_minpsz = winit->qi_minfo->mi_minpsz;
wq->q_maxpsz = winit->qi_minfo->mi_maxpsz;
sq = rq->q_syncq;
outer = sq->sq_outer;
if (outer != NULL) {
ASSERT(wq->q_syncq->sq_outer == outer);
outer_remove(outer, rq->q_syncq);
if (wq->q_syncq != rq->q_syncq)
outer_remove(outer, wq->q_syncq);
}
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
if (sq != SQ(rq)) {
if (!(rq->q_flag & QPERMOD))
free_syncq(sq);
if (wq->q_syncq == rq->q_syncq)
wq->q_syncq = NULL;
rq->q_syncq = NULL;
}
if (wq->q_syncq != NULL && wq->q_syncq != sq &&
wq->q_syncq != SQ(rq)) {
free_syncq(wq->q_syncq);
wq->q_syncq = NULL;
}
ASSERT(rq->q_syncq == NULL || (rq->q_syncq->sq_head == NULL &&
rq->q_syncq->sq_tail == NULL));
ASSERT(wq->q_syncq == NULL || (wq->q_syncq->sq_head == NULL &&
wq->q_syncq->sq_tail == NULL));
if (!(rq->q_flag & QPERMOD) &&
rq->q_syncq != NULL && rq->q_syncq->sq_ciputctrl != NULL) {
ASSERT(rq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(rq->q_syncq->sq_ciputctrl,
rq->q_syncq->sq_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, rq->q_syncq->sq_ciputctrl);
rq->q_syncq->sq_ciputctrl = NULL;
rq->q_syncq->sq_nciputctrl = 0;
}
if (!(wq->q_flag & QPERMOD) &&
wq->q_syncq != NULL && wq->q_syncq->sq_ciputctrl != NULL) {
ASSERT(wq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(wq->q_syncq->sq_ciputctrl,
wq->q_syncq->sq_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, wq->q_syncq->sq_ciputctrl);
wq->q_syncq->sq_ciputctrl = NULL;
wq->q_syncq->sq_nciputctrl = 0;
}
sq = SQ(rq);
ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
ASSERT((sq->sq_flags & ~SQ_TYPES_IN_FLAGS) == 0);
rq->q_syncq = wq->q_syncq = sq;
sq->sq_type = sqtype;
sq->sq_flags = (sqtype & SQ_TYPES_IN_FLAGS);
ASSERT((sq->sq_svcflags & SQ_SERVICE) == 0);
sq->sq_svcflags = 0;
if (lock_needed) {
mutex_enter(QLOCK(rq));
rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
mutex_exit(QLOCK(rq));
mutex_enter(QLOCK(wq));
wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
mutex_exit(QLOCK(wq));
} else {
rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
}
if (qflag & QPERQ) {
sq = new_syncq();
sq->sq_type = rq->q_syncq->sq_type;
sq->sq_flags = rq->q_syncq->sq_flags;
ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL);
wq->q_syncq = sq;
}
if (qflag & QPERMOD) {
sq = dmp->dm_sq;
ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL);
rq->q_syncq = wq->q_syncq = sq;
}
if (qflag & QMTOUTPERIM) {
outer = dmp->dm_sq;
ASSERT(outer->sq_outer == NULL);
outer_insert(outer, rq->q_syncq);
if (wq->q_syncq != rq->q_syncq)
outer_insert(outer, wq->q_syncq);
}
ASSERT((rq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
(rq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
ASSERT((wq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
(wq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
ASSERT((rq->q_flag & QMT_TYPEMASK) == (qflag & QMT_TYPEMASK));
rq->q_struiot =
(rq->q_flag & QSYNCSTR) ? rinit->qi_struiot : STRUIOT_NONE;
wq->q_struiot =
(wq->q_flag & QSYNCSTR) ? winit->qi_struiot : STRUIOT_NONE;
}
perdm_t *
hold_dm(struct streamtab *str, uint32_t qflag, uint32_t sqtype)
{
syncq_t *sq;
perdm_t **pp;
perdm_t *p;
perdm_t *dmp;
ASSERT(str != NULL);
ASSERT(qflag & (QPERMOD | QMTOUTPERIM));
rw_enter(&perdm_rwlock, RW_READER);
for (p = perdm_list; p != NULL; p = p->dm_next) {
if (p->dm_str == str) {
atomic_inc_32(&(p->dm_ref));
rw_exit(&perdm_rwlock);
return (p);
}
}
rw_exit(&perdm_rwlock);
sq = new_syncq();
if (qflag & QPERMOD) {
sq->sq_type = sqtype | SQ_PERMOD;
sq->sq_flags = sqtype & SQ_TYPES_IN_FLAGS;
} else {
ASSERT(qflag & QMTOUTPERIM);
sq->sq_onext = sq->sq_oprev = sq;
}
dmp = kmem_alloc(sizeof (perdm_t), KM_SLEEP);
dmp->dm_sq = sq;
dmp->dm_str = str;
dmp->dm_ref = 1;
dmp->dm_next = NULL;
rw_enter(&perdm_rwlock, RW_WRITER);
for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) {
if (p->dm_str == str) {
p->dm_ref++;
rw_exit(&perdm_rwlock);
free_syncq(sq);
kmem_free(dmp, sizeof (perdm_t));
return (p);
}
}
*pp = dmp;
rw_exit(&perdm_rwlock);
return (dmp);
}
void
rele_dm(perdm_t *dmp)
{
perdm_t **pp;
perdm_t *p;
rw_enter(&perdm_rwlock, RW_WRITER);
ASSERT(dmp->dm_ref > 0);
if (--dmp->dm_ref > 0) {
rw_exit(&perdm_rwlock);
return;
}
for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next))
if (p == dmp)
break;
ASSERT(p == dmp);
*pp = p->dm_next;
rw_exit(&perdm_rwlock);
wait_sq_svc(p->dm_sq);
free_syncq(p->dm_sq);
kmem_free(p, sizeof (perdm_t));
}
int
strmakemsg(
struct strbuf *mctl,
ssize_t *iosize,
struct uio *uiop,
stdata_t *stp,
int32_t flag,
mblk_t **mpp)
{
mblk_t *mpctl = NULL;
mblk_t *mpdata = NULL;
int error;
ASSERT(uiop != NULL);
*mpp = NULL;
if ((mctl != NULL) && (mctl->len >= 0)) {
error = strmakectl(mctl, flag, uiop->uio_fmode, &mpctl);
if (error)
return (error);
}
if (*iosize >= 0) {
error = strmakedata(iosize, uiop, stp, flag, &mpdata);
if (error) {
freemsg(mpctl);
return (error);
}
}
if (mpctl != NULL) {
if (mpdata != NULL)
linkb(mpctl, mpdata);
*mpp = mpctl;
} else {
*mpp = mpdata;
}
return (0);
}
int
strmakectl(
struct strbuf *mctl,
int32_t flag,
int32_t fflag,
mblk_t **mpp)
{
mblk_t *bp = NULL;
unsigned char msgtype;
int error = 0;
cred_t *cr = CRED();
ASSERT(cr != NULL);
*mpp = NULL;
if ((mctl != NULL) && (mctl->len >= 0)) {
caddr_t base;
int ctlcount;
int allocsz;
if (flag & RS_HIPRI)
msgtype = M_PCPROTO;
else
msgtype = M_PROTO;
ctlcount = mctl->len;
base = mctl->buf;
allocsz = MAX(ctlcount, 64);
while ((bp = allocb_cred(allocsz, cr,
curproc->p_pid)) == NULL) {
if (fflag & (FNDELAY|FNONBLOCK))
return (EAGAIN);
if (error = strwaitbuf(allocsz, BPRI_MED))
return (error);
}
bp->b_datap->db_type = msgtype;
if (copyin(base, bp->b_wptr, ctlcount)) {
freeb(bp);
return (EFAULT);
}
bp->b_wptr += ctlcount;
}
*mpp = bp;
return (0);
}
int
strmakedata(
ssize_t *iosize,
struct uio *uiop,
stdata_t *stp,
int32_t flag,
mblk_t **mpp)
{
mblk_t *mp = NULL;
mblk_t *bp;
int wroff = (int)stp->sd_wroff;
int tail_len = (int)stp->sd_tail;
int extra = wroff + tail_len;
int error = 0;
ssize_t maxblk;
ssize_t count = *iosize;
cred_t *cr;
*mpp = NULL;
if (count < 0)
return (0);
cr = CRED();
ASSERT(cr != NULL);
maxblk = stp->sd_maxblk;
if (maxblk == INFPSZ)
maxblk = count;
do {
ssize_t size;
dblk_t *dp;
ASSERT(uiop);
size = MIN(count, maxblk);
while ((bp = allocb_cred(size + extra, cr,
curproc->p_pid)) == NULL) {
error = EAGAIN;
if ((uiop->uio_fmode & (FNDELAY|FNONBLOCK)) ||
(error = strwaitbuf(size + extra, BPRI_MED)) != 0) {
if (count == *iosize) {
freemsg(mp);
return (error);
} else {
*iosize -= count;
*mpp = mp;
return (0);
}
}
}
dp = bp->b_datap;
dp->db_cpid = curproc->p_pid;
ASSERT(wroff <= dp->db_lim - bp->b_wptr);
bp->b_wptr = bp->b_rptr = bp->b_rptr + wroff;
if (flag & STRUIO_POSTPONE) {
dp->db_struioflag = STRUIO_SPEC;
dp->db_cksumstart = 0;
dp->db_cksumstuff = 0;
dp->db_cksumend = size;
*(long long *)dp->db_struioun.data = 0ll;
bp->b_wptr += size;
} else {
if (stp->sd_copyflag & STRCOPYCACHED)
uiop->uio_extflg |= UIO_COPY_CACHED;
if (size != 0) {
error = uiomove(bp->b_wptr, size, UIO_WRITE,
uiop);
if (error != 0) {
freeb(bp);
freemsg(mp);
return (error);
}
}
bp->b_wptr += size;
if (stp->sd_wputdatafunc != NULL) {
mblk_t *newbp;
newbp = (stp->sd_wputdatafunc)(stp->sd_vnode,
bp, NULL, NULL, NULL, NULL);
if (newbp == NULL) {
freeb(bp);
freemsg(mp);
return (ECOMM);
}
bp = newbp;
}
}
count -= size;
if (mp == NULL)
mp = bp;
else
linkb(mp, bp);
} while (count > 0);
*mpp = mp;
return (0);
}
int
strwaitbuf(size_t size, int pri)
{
bufcall_id_t id;
mutex_enter(&bcall_monitor);
if ((id = bufcall(size, pri, (void (*)(void *))cv_broadcast,
&ttoproc(curthread)->p_flag_cv)) == 0) {
mutex_exit(&bcall_monitor);
return (ENOSR);
}
if (!cv_wait_sig(&(ttoproc(curthread)->p_flag_cv), &bcall_monitor)) {
unbufcall(id);
mutex_exit(&bcall_monitor);
return (EINTR);
}
unbufcall(id);
mutex_exit(&bcall_monitor);
return (0);
}
int
strwaitq(stdata_t *stp, int flag, ssize_t count, int fmode, clock_t timout,
int *done)
{
int slpflg, errs;
int error;
kcondvar_t *sleepon;
mblk_t *mp;
ssize_t *rd_count;
clock_t rval;
ASSERT(MUTEX_HELD(&stp->sd_lock));
if ((flag & READWAIT) || (flag & GETWAIT)) {
slpflg = RSLEEP;
sleepon = &_RD(stp->sd_wrq)->q_wait;
errs = STRDERR|STPLEX;
} else {
slpflg = WSLEEP;
sleepon = &stp->sd_wrq->q_wait;
errs = STWRERR|STRHUP|STPLEX;
}
if (flag & STR_NOERROR)
errs = STPLEX;
if (stp->sd_wakeq & slpflg) {
stp->sd_wakeq &= ~slpflg;
*done = 0;
return (0);
}
if (stp->sd_flag & errs) {
error = strgeterr(stp, errs, (flag & STR_PEEK));
if (error != 0) {
*done = 1;
return (error);
}
}
if ((flag & READWAIT) && (stp->sd_flag & SNDMREAD)) {
mutex_exit(&stp->sd_lock);
if (!(mp = allocb_wait(sizeof (ssize_t), BPRI_MED,
(flag & STR_NOSIG), &error))) {
mutex_enter(&stp->sd_lock);
*done = 1;
return (error);
}
mp->b_datap->db_type = M_READ;
rd_count = (ssize_t *)mp->b_wptr;
*rd_count = count;
mp->b_wptr += sizeof (ssize_t);
stream_willservice(stp);
putnext(stp->sd_wrq, mp);
stream_runservice(stp);
mutex_enter(&stp->sd_lock);
if (_RD(stp->sd_wrq)->q_first != NULL) {
*done = 0;
return (0);
}
}
if (fmode & (FNDELAY|FNONBLOCK)) {
if (!(flag & NOINTR))
error = EAGAIN;
else
error = 0;
*done = 1;
return (error);
}
stp->sd_flag |= slpflg;
TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAIT2,
"strwaitq sleeps (2):%p, %X, %lX, %X, %p",
stp, flag, count, fmode, done);
rval = str_cv_wait(sleepon, &stp->sd_lock, timout, flag & STR_NOSIG);
if (rval > 0) {
TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAKE2,
"strwaitq awakes(2):%X, %X, %X, %X, %X",
stp, flag, count, fmode, done);
} else if (rval == 0) {
TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_INTR2,
"strwaitq interrupt #2:%p, %X, %lX, %X, %p",
stp, flag, count, fmode, done);
stp->sd_flag &= ~slpflg;
cv_broadcast(sleepon);
if (!(flag & NOINTR))
error = EINTR;
else
error = 0;
*done = 1;
return (error);
} else {
TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_TIME,
"strwaitq timeout:%p, %X, %lX, %X, %p",
stp, flag, count, fmode, done);
*done = 1;
if (!(flag & NOINTR))
return (ETIME);
else
return (0);
}
if ((stp->sd_flag & errs) && !(flag & STR_DELAYERR)) {
error = strgeterr(stp, errs, (flag & STR_PEEK));
if (error != 0) {
*done = 1;
return (error);
}
}
*done = 0;
return (0);
}
#define cantsend(p, t, sig) \
(sigismember(&(p)->p_ignore, sig) || signal_is_blocked((t), sig))
int
straccess(struct stdata *stp, enum jcaccess mode)
{
extern kcondvar_t lbolt_cv;
kthread_t *t = curthread;
proc_t *p = ttoproc(t);
sess_t *sp;
ASSERT(mutex_owned(&stp->sd_lock));
if (stp->sd_sidp == NULL || stp->sd_vnode->v_type == VFIFO)
return (0);
mutex_enter(&p->p_lock);
for (;;) {
mutex_enter(&p->p_splock);
sp = p->p_sessp;
mutex_enter(&sp->s_lock);
if (sp->s_dev != stp->sd_vnode->v_rdev ||
p->p_pgidp == stp->sd_pgidp) {
mutex_exit(&sp->s_lock);
mutex_exit(&p->p_splock);
mutex_exit(&p->p_lock);
return (0);
}
if (sp->s_vp == NULL) {
if (!cantsend(p, t, SIGHUP))
sigtoproc(p, t, SIGHUP);
mutex_exit(&sp->s_lock);
mutex_exit(&p->p_splock);
mutex_exit(&p->p_lock);
return (EIO);
}
mutex_exit(&sp->s_lock);
mutex_exit(&p->p_splock);
if (mode == JCGETP) {
mutex_exit(&p->p_lock);
return (0);
}
if (mode == JCREAD) {
if (p->p_detached || cantsend(p, t, SIGTTIN)) {
mutex_exit(&p->p_lock);
return (EIO);
}
mutex_exit(&p->p_lock);
mutex_exit(&stp->sd_lock);
pgsignal(p->p_pgidp, SIGTTIN);
mutex_enter(&stp->sd_lock);
mutex_enter(&p->p_lock);
} else {
if ((mode == JCWRITE && !(stp->sd_flag & STRTOSTOP)) ||
cantsend(p, t, SIGTTOU)) {
mutex_exit(&p->p_lock);
return (0);
}
if (p->p_detached) {
mutex_exit(&p->p_lock);
return (EIO);
}
mutex_exit(&p->p_lock);
mutex_exit(&stp->sd_lock);
pgsignal(p->p_pgidp, SIGTTOU);
mutex_enter(&stp->sd_lock);
mutex_enter(&p->p_lock);
}
mutex_exit(&stp->sd_lock);
if (!cv_wait_sig_swap(&lbolt_cv, &p->p_lock)) {
mutex_exit(&p->p_lock);
mutex_enter(&stp->sd_lock);
return (EINTR);
}
mutex_exit(&p->p_lock);
mutex_enter(&stp->sd_lock);
mutex_enter(&p->p_lock);
}
}
size_t
xmsgsize(mblk_t *bp)
{
unsigned char type;
size_t count = 0;
type = bp->b_datap->db_type;
for (; bp; bp = bp->b_cont) {
if (type != bp->b_datap->db_type)
break;
ASSERT(bp->b_wptr >= bp->b_rptr);
count += bp->b_wptr - bp->b_rptr;
}
return (count);
}
struct stdata *
shalloc(queue_t *qp)
{
stdata_t *stp;
stp = kmem_cache_alloc(stream_head_cache, KM_SLEEP);
stp->sd_wrq = _WR(qp);
stp->sd_strtab = NULL;
stp->sd_iocid = 0;
stp->sd_mate = NULL;
stp->sd_freezer = NULL;
stp->sd_refcnt = 0;
stp->sd_wakeq = 0;
stp->sd_anchor = 0;
stp->sd_struiowrq = NULL;
stp->sd_struiordq = NULL;
stp->sd_struiodnak = 0;
stp->sd_struionak = NULL;
stp->sd_t_audit_data = NULL;
stp->sd_rput_opt = 0;
stp->sd_wput_opt = 0;
stp->sd_read_opt = 0;
stp->sd_rprotofunc = strrput_proto;
stp->sd_rmiscfunc = strrput_misc;
stp->sd_rderrfunc = stp->sd_wrerrfunc = NULL;
stp->sd_rputdatafunc = stp->sd_wputdatafunc = NULL;
stp->sd_ciputctrl = NULL;
stp->sd_nciputctrl = 0;
stp->sd_qhead = NULL;
stp->sd_qtail = NULL;
stp->sd_servid = NULL;
stp->sd_nqueues = 0;
stp->sd_svcflags = 0;
stp->sd_copyflag = 0;
return (stp);
}
void
shfree(stdata_t *stp)
{
ASSERT(MUTEX_NOT_HELD(&stp->sd_lock));
stp->sd_wrq = NULL;
mutex_enter(&stp->sd_qlock);
while (stp->sd_svcflags & STRS_SCHEDULED) {
STRSTAT(strwaits);
cv_wait(&stp->sd_qcv, &stp->sd_qlock);
}
mutex_exit(&stp->sd_qlock);
if (stp->sd_ciputctrl != NULL) {
ASSERT(stp->sd_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(stp->sd_ciputctrl,
stp->sd_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, stp->sd_ciputctrl);
stp->sd_ciputctrl = NULL;
stp->sd_nciputctrl = 0;
}
ASSERT(stp->sd_qhead == NULL);
ASSERT(stp->sd_qtail == NULL);
ASSERT(stp->sd_nqueues == 0);
kmem_cache_free(stream_head_cache, stp);
}
queue_t *
allocq(void)
{
queinfo_t *qip;
queue_t *qp, *wqp;
syncq_t *sq;
qip = kmem_cache_alloc(queue_cache, KM_SLEEP);
qp = &qip->qu_rqueue;
wqp = &qip->qu_wqueue;
sq = &qip->qu_syncq;
qp->q_last = NULL;
qp->q_next = NULL;
qp->q_ptr = NULL;
qp->q_flag = QUSE | QREADR;
qp->q_bandp = NULL;
qp->q_stream = NULL;
qp->q_syncq = sq;
qp->q_nband = 0;
qp->q_nfsrv = NULL;
qp->q_draining = 0;
qp->q_syncqmsgs = 0;
qp->q_spri = 0;
qp->q_qtstamp = 0;
qp->q_sqtstamp = 0;
qp->q_fp = NULL;
wqp->q_last = NULL;
wqp->q_next = NULL;
wqp->q_ptr = NULL;
wqp->q_flag = QUSE;
wqp->q_bandp = NULL;
wqp->q_stream = NULL;
wqp->q_syncq = sq;
wqp->q_nband = 0;
wqp->q_nfsrv = NULL;
wqp->q_draining = 0;
wqp->q_syncqmsgs = 0;
wqp->q_qtstamp = 0;
wqp->q_sqtstamp = 0;
wqp->q_spri = 0;
sq->sq_count = 0;
sq->sq_rmqcount = 0;
sq->sq_flags = 0;
sq->sq_type = 0;
sq->sq_callbflags = 0;
sq->sq_cancelid = 0;
sq->sq_ciputctrl = NULL;
sq->sq_nciputctrl = 0;
sq->sq_needexcl = 0;
sq->sq_svcflags = 0;
return (qp);
}
void
freeq(queue_t *qp)
{
qband_t *qbp, *nqbp;
syncq_t *sq, *outer;
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
wait_svc(qp);
(void) flush_syncq(qp->q_syncq, qp);
(void) flush_syncq(wqp->q_syncq, wqp);
ASSERT(qp->q_syncqmsgs == 0 && wqp->q_syncqmsgs == 0);
flushq(qp, FLUSHALL);
flushq(wqp, FLUSHALL);
qp->q_next = wqp->q_next = NULL;
ASSERT(!(qp->q_flag & QENAB));
ASSERT(!(wqp->q_flag & QENAB));
outer = qp->q_syncq->sq_outer;
if (outer != NULL) {
outer_remove(outer, qp->q_syncq);
if (wqp->q_syncq != qp->q_syncq)
outer_remove(outer, wqp->q_syncq);
}
if (qp->q_syncq != SQ(qp) && !(qp->q_flag & QPERMOD))
free_syncq(qp->q_syncq);
if (qp->q_syncq != wqp->q_syncq && wqp->q_syncq != SQ(qp))
free_syncq(wqp->q_syncq);
ASSERT((qp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0);
ASSERT((wqp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0);
ASSERT(MUTEX_NOT_HELD(QLOCK(qp)));
ASSERT(MUTEX_NOT_HELD(QLOCK(wqp)));
sq = SQ(qp);
ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
ASSERT(sq->sq_callbpend == NULL);
ASSERT(sq->sq_needexcl == 0);
if (sq->sq_ciputctrl != NULL) {
ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(sq->sq_ciputctrl,
sq->sq_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, sq->sq_ciputctrl);
sq->sq_ciputctrl = NULL;
sq->sq_nciputctrl = 0;
}
ASSERT(qp->q_first == NULL && wqp->q_first == NULL);
ASSERT(qp->q_count == 0 && wqp->q_count == 0);
ASSERT(qp->q_mblkcnt == 0 && wqp->q_mblkcnt == 0);
qp->q_flag &= ~QUSE;
wqp->q_flag &= ~QUSE;
qbp = qp->q_bandp;
while (qbp) {
nqbp = qbp->qb_next;
freeband(qbp);
qbp = nqbp;
}
qbp = wqp->q_bandp;
while (qbp) {
nqbp = qbp->qb_next;
freeband(qbp);
qbp = nqbp;
}
kmem_cache_free(queue_cache, qp);
}
qband_t *
allocband(void)
{
qband_t *qbp;
qbp = kmem_cache_alloc(qband_cache, KM_NOSLEEP);
if (qbp == NULL)
return (NULL);
qbp->qb_next = NULL;
qbp->qb_count = 0;
qbp->qb_mblkcnt = 0;
qbp->qb_first = NULL;
qbp->qb_last = NULL;
qbp->qb_flag = 0;
return (qbp);
}
void
freeband(qband_t *qbp)
{
kmem_cache_free(qband_cache, qbp);
}
int
putnextctl_wait(queue_t *q, int type)
{
mblk_t *bp;
int error;
if ((datamsg(type) && (type != M_DELAY)) ||
(bp = allocb_wait(0, BPRI_HI, 0, &error)) == NULL)
return (0);
bp->b_datap->db_type = (unsigned char)type;
putnext(q, bp);
return (1);
}
void
runbufcalls(void)
{
strbufcall_t *bcp;
mutex_enter(&bcall_monitor);
mutex_enter(&strbcall_lock);
if (strbcalls.bc_head) {
size_t count;
int nevent;
nevent = 0;
for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next)
nevent++;
count = kmem_avail();
while ((bcp = strbcalls.bc_head) != NULL && nevent) {
STRSTAT(bufcalls);
--nevent;
if (bcp->bc_size <= count) {
bcp->bc_executor = curthread;
mutex_exit(&strbcall_lock);
(*bcp->bc_func)(bcp->bc_arg);
mutex_enter(&strbcall_lock);
bcp->bc_executor = NULL;
cv_broadcast(&bcall_cv);
strbcalls.bc_head = bcp->bc_next;
kmem_free(bcp, sizeof (strbufcall_t));
} else {
if (bcp->bc_next != NULL) {
strbcalls.bc_head = bcp->bc_next;
bcp->bc_next = NULL;
strbcalls.bc_tail->bc_next = bcp;
strbcalls.bc_tail = bcp;
}
}
}
if (strbcalls.bc_head == NULL)
strbcalls.bc_tail = NULL;
}
mutex_exit(&strbcall_lock);
mutex_exit(&bcall_monitor);
}
static void
runservice(queue_t *q)
{
qband_t *qbp;
ASSERT(q->q_qinfo->qi_srvp);
again:
entersq(q->q_syncq, SQ_SVC);
TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_START,
"runservice starts:%p", q);
if (!(q->q_flag & QWCLOSE))
(*q->q_qinfo->qi_srvp)(q);
TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_END,
"runservice ends:(%p)", q);
leavesq(q->q_syncq, SQ_SVC);
mutex_enter(QLOCK(q));
if (q->q_flag & QENAB) {
q->q_flag &= ~QENAB;
mutex_exit(QLOCK(q));
goto again;
}
q->q_flag &= ~QINSERVICE;
q->q_flag &= ~QBACK;
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next)
qbp->qb_flag &= ~QB_BACK;
cv_broadcast(&q->q_wait);
mutex_exit(QLOCK(q));
}
void
streams_bufcall_service(void)
{
callb_cpr_t cprinfo;
CALLB_CPR_INIT(&cprinfo, &strbcall_lock, callb_generic_cpr,
"streams_bufcall_service");
mutex_enter(&strbcall_lock);
for (;;) {
if (strbcalls.bc_head != NULL && kmem_avail() > 0) {
mutex_exit(&strbcall_lock);
runbufcalls();
mutex_enter(&strbcall_lock);
}
if (strbcalls.bc_head != NULL) {
STRSTAT(bcwaits);
CALLB_CPR_SAFE_BEGIN(&cprinfo);
(void) cv_reltimedwait(&memavail_cv, &strbcall_lock,
SEC_TO_TICK(60), TR_CLOCK_TICK);
CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock);
}
if (strbcalls.bc_head == NULL) {
CALLB_CPR_SAFE_BEGIN(&cprinfo);
cv_wait(&strbcall_cv, &strbcall_lock);
CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock);
}
}
}
static void
streams_qbkgrnd_service(void)
{
callb_cpr_t cprinfo;
queue_t *q;
CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr,
"streams_bkgrnd_service");
mutex_enter(&service_queue);
for (;;) {
while ((freebs_list == NULL) && (qhead == NULL)) {
CALLB_CPR_SAFE_BEGIN(&cprinfo);
cv_wait(&services_to_run, &service_queue);
CALLB_CPR_SAFE_END(&cprinfo, &service_queue);
}
while (freebs_list != NULL) {
mblk_t *mp = freebs_list;
freebs_list = mp->b_next;
mutex_exit(&service_queue);
mblk_free(mp);
mutex_enter(&service_queue);
}
while (qhead != NULL) {
DQ(q, qhead, qtail, q_link);
ASSERT(q != NULL);
mutex_exit(&service_queue);
queue_service(q);
mutex_enter(&service_queue);
}
ASSERT(qhead == NULL && qtail == NULL);
}
}
static void
streams_sqbkgrnd_service(void)
{
callb_cpr_t cprinfo;
syncq_t *sq;
CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr,
"streams_sqbkgrnd_service");
mutex_enter(&service_queue);
for (;;) {
while (sqhead == NULL) {
CALLB_CPR_SAFE_BEGIN(&cprinfo);
cv_wait(&syncqs_to_run, &service_queue);
CALLB_CPR_SAFE_END(&cprinfo, &service_queue);
}
while (sqhead != NULL) {
DQ(sq, sqhead, sqtail, sq_next);
ASSERT(sq != NULL);
ASSERT(sq->sq_svcflags & SQ_BGTHREAD);
mutex_exit(&service_queue);
syncq_service(sq);
mutex_enter(&service_queue);
}
}
}
void
wait_sq_svc(syncq_t *sq)
{
mutex_enter(SQLOCK(sq));
sq->sq_svcflags |= SQ_DISABLED;
if (sq->sq_svcflags & SQ_BGTHREAD) {
syncq_t *sq_chase;
syncq_t *sq_curr;
int removed;
ASSERT(sq->sq_servcount == 1);
mutex_enter(&service_queue);
RMQ(sq, sqhead, sqtail, sq_next, sq_chase, sq_curr, removed);
mutex_exit(&service_queue);
if (removed) {
sq->sq_svcflags &= ~SQ_BGTHREAD;
sq->sq_servcount = 0;
STRSTAT(sqremoved);
goto done;
}
}
while (sq->sq_servcount != 0) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
done:
mutex_exit(SQLOCK(sq));
}
void
sqenable(syncq_t *sq)
{
ASSERT(MUTEX_HELD(SQLOCK(sq)));
IMPLY(sq->sq_servcount == 0, sq->sq_next == NULL);
IMPLY(sq->sq_next != NULL, sq->sq_svcflags & SQ_BGTHREAD);
if (sq->sq_svcflags & (SQ_DISABLED | SQ_BGTHREAD))
return;
if (sq->sq_servcount != 0) {
if (((sq->sq_type & SQ_PERMOD) == 0) ||
(sq->sq_servcount >= MIN(sq->sq_nqueues, ncpus_online))) {
STRSTAT(sqtoomany);
return;
}
}
sq->sq_tstamp = ddi_get_lbolt();
STRSTAT(sqenables);
sq->sq_servid = (void *)taskq_dispatch(streams_taskq,
(task_func_t *)syncq_service, sq, TQ_NOSLEEP | TQ_NOQUEUE);
if (sq->sq_servid != NULL) {
sq->sq_servcount++;
return;
}
if (sq->sq_servcount != 0)
return;
mutex_enter(&service_queue);
STRSTAT(taskqfails);
ENQUEUE(sq, sqhead, sqtail, sq_next);
sq->sq_svcflags |= SQ_BGTHREAD;
sq->sq_servcount = 1;
cv_signal(&syncqs_to_run);
mutex_exit(&service_queue);
}
void
freebs_enqueue(mblk_t *mp, dblk_t *dbp)
{
int qindex = CPU->cpu_seqid >> esbq_log2_cpus_per_q;
esb_queue_t *eqp;
ASSERT(dbp->db_mblk == mp);
ASSERT(qindex < esbq_nelem);
eqp = system_esbq_array;
if (eqp != NULL) {
eqp += qindex;
} else {
mutex_enter(&esbq_lock);
if (kmem_ready && system_esbq_array == NULL)
system_esbq_array = (esb_queue_t *)kmem_zalloc(
esbq_nelem * sizeof (esb_queue_t), KM_NOSLEEP);
mutex_exit(&esbq_lock);
eqp = system_esbq_array;
if (eqp != NULL)
eqp += qindex;
else
eqp = &system_esbq;
}
if (dbp->db_frtnp->free_func == NULL) {
panic("freebs_enqueue: dblock %p has a NULL free callback",
(void *)dbp);
}
mutex_enter(&eqp->eq_lock);
if (eqp->eq_head == NULL) {
eqp->eq_head = eqp->eq_tail = mp;
} else {
eqp->eq_tail->b_next = mp;
eqp->eq_tail = mp;
}
eqp->eq_len++;
if (eqp->eq_len >= esbq_max_qlen &&
!(eqp->eq_flags & ESBQ_PROCESSING))
esballoc_process_queue(eqp);
esballoc_set_timer(eqp, esbq_timeout);
mutex_exit(&eqp->eq_lock);
}
static void
esballoc_process_queue(esb_queue_t *eqp)
{
mblk_t *mp;
ASSERT(MUTEX_HELD(&eqp->eq_lock));
eqp->eq_flags |= ESBQ_PROCESSING;
do {
mp = eqp->eq_head;
eqp->eq_tail->b_next = NULL;
eqp->eq_head = eqp->eq_tail = NULL;
eqp->eq_len = 0;
mutex_exit(&eqp->eq_lock);
esballoc_enqueue_mblk(mp);
mutex_enter(&eqp->eq_lock);
} while ((eqp->eq_len >= esbq_max_qlen) && (eqp->eq_len > 0));
eqp->eq_flags &= ~ESBQ_PROCESSING;
}
static void
esballoc_mblk_free(mblk_t *mp)
{
mblk_t *nextmp;
for (; mp != NULL; mp = nextmp) {
nextmp = mp->b_next;
mp->b_next = NULL;
mblk_free(mp);
}
}
static void
esballoc_enqueue_mblk(mblk_t *mp)
{
if (taskq_dispatch(system_taskq, (task_func_t *)esballoc_mblk_free, mp,
TQ_NOSLEEP) == TASKQID_INVALID) {
mblk_t *first_mp = mp;
mutex_enter(&service_queue);
STRSTAT(taskqfails);
while (mp->b_next != NULL)
mp = mp->b_next;
mp->b_next = freebs_list;
freebs_list = first_mp;
cv_signal(&services_to_run);
mutex_exit(&service_queue);
}
}
static void
esballoc_timer(void *arg)
{
esb_queue_t *eqp = arg;
mutex_enter(&eqp->eq_lock);
eqp->eq_flags &= ~ESBQ_TIMER;
if (!(eqp->eq_flags & ESBQ_PROCESSING) &&
eqp->eq_len > 0)
esballoc_process_queue(eqp);
esballoc_set_timer(eqp, esbq_timeout);
mutex_exit(&eqp->eq_lock);
}
static void
esballoc_set_timer(esb_queue_t *eqp, clock_t eq_timeout)
{
ASSERT(MUTEX_HELD(&eqp->eq_lock));
if (eqp->eq_len > 0 && !(eqp->eq_flags & ESBQ_TIMER)) {
(void) timeout(esballoc_timer, eqp, eq_timeout);
eqp->eq_flags |= ESBQ_TIMER;
}
}
void
esballoc_queue_init(void)
{
esbq_log2_cpus_per_q = highbit(esbq_cpus_per_q - 1);
esbq_cpus_per_q = 1 << esbq_log2_cpus_per_q;
esbq_nelem = howmany(NCPU, esbq_cpus_per_q);
system_esbq.eq_len = 0;
system_esbq.eq_head = system_esbq.eq_tail = NULL;
system_esbq.eq_flags = 0;
}
void
setqback(queue_t *q, unsigned char pri)
{
int i;
qband_t *qbp;
qband_t **qbpp;
ASSERT(MUTEX_HELD(QLOCK(q)));
if (pri != 0) {
if (pri > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (pri > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
cmn_err(CE_WARN,
"setqback: can't allocate qband\n");
return;
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = pri;
while (--i)
qbp = qbp->qb_next;
qbp->qb_flag |= QB_BACK;
} else {
q->q_flag |= QBACK;
}
}
int
strcopyin(void *from, void *to, size_t len, int copyflag)
{
if (copyflag & U_TO_K) {
ASSERT((copyflag & K_TO_K) == 0);
if (copyin(from, to, len))
return (EFAULT);
} else {
ASSERT(copyflag & K_TO_K);
bcopy(from, to, len);
}
return (0);
}
int
strcopyout(void *from, void *to, size_t len, int copyflag)
{
if (copyflag & U_TO_K) {
if (copyout(from, to, len))
return (EFAULT);
} else {
ASSERT(copyflag & K_TO_K);
bcopy(from, to, len);
}
return (0);
}
void
strsignal_nolock(stdata_t *stp, int sig, uchar_t band)
{
ASSERT(MUTEX_HELD(&stp->sd_lock));
switch (sig) {
case SIGPOLL:
if (stp->sd_sigflags & S_MSG)
strsendsig(stp->sd_siglist, S_MSG, band, 0);
break;
default:
if (stp->sd_pgidp)
pgsignal(stp->sd_pgidp, sig);
break;
}
}
void
strsignal(stdata_t *stp, int sig, int32_t band)
{
TRACE_3(TR_FAC_STREAMS_FR, TR_SENDSIG,
"strsignal:%p, %X, %X", stp, sig, band);
mutex_enter(&stp->sd_lock);
switch (sig) {
case SIGPOLL:
if (stp->sd_sigflags & S_MSG)
strsendsig(stp->sd_siglist, S_MSG, (uchar_t)band, 0);
break;
default:
if (stp->sd_pgidp) {
pgsignal(stp->sd_pgidp, sig);
}
break;
}
mutex_exit(&stp->sd_lock);
}
void
strhup(stdata_t *stp)
{
ASSERT(mutex_owned(&stp->sd_lock));
pollwakeup(&stp->sd_pollist, POLLHUP);
if (stp->sd_sigflags & S_HANGUP)
strsendsig(stp->sd_siglist, S_HANGUP, 0, 0);
}
void
backenable(queue_t *q, uchar_t pri)
{
queue_t *nq;
claimstr(q);
for (nq = backq(q); nq && !nq->q_qinfo->qi_srvp; nq = backq(nq)) {
ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(nq));
}
if (nq) {
kthread_t *freezer;
freezer = STREAM(q)->sd_freezer;
if (freezer != curthread || STREAM(q) != STREAM(nq)) {
mutex_enter(QLOCK(nq));
}
#ifdef DEBUG
else {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
ASSERT(MUTEX_HELD(QLOCK(nq)));
}
#endif
setqback(nq, pri);
qenable_locked(nq);
if (freezer != curthread || STREAM(q) != STREAM(nq))
mutex_exit(QLOCK(nq));
}
releasestr(q);
}
int
strgeterr(stdata_t *stp, int32_t flags_to_check, int ispeek)
{
int32_t sd_flag = stp->sd_flag & flags_to_check;
int error = 0;
ASSERT(MUTEX_HELD(&stp->sd_lock));
ASSERT((flags_to_check & ~(STRDERR|STWRERR|STRHUP|STPLEX)) == 0);
if (sd_flag & STPLEX)
error = EINVAL;
else if (sd_flag & STRDERR) {
error = stp->sd_rerror;
if ((stp->sd_flag & STRDERRNONPERSIST) && !ispeek) {
stp->sd_rerror = 0;
stp->sd_flag &= ~STRDERR;
}
if (error == 0 && stp->sd_rderrfunc != NULL) {
int clearerr = 0;
error = (*stp->sd_rderrfunc)(stp->sd_vnode, ispeek,
&clearerr);
if (clearerr) {
stp->sd_flag &= ~STRDERR;
stp->sd_rderrfunc = NULL;
}
}
} else if (sd_flag & STWRERR) {
error = stp->sd_werror;
if ((stp->sd_flag & STWRERRNONPERSIST) && !ispeek) {
stp->sd_werror = 0;
stp->sd_flag &= ~STWRERR;
}
if (error == 0 && stp->sd_wrerrfunc != NULL) {
int clearerr = 0;
error = (*stp->sd_wrerrfunc)(stp->sd_vnode, ispeek,
&clearerr);
if (clearerr) {
stp->sd_flag &= ~STWRERR;
stp->sd_wrerrfunc = NULL;
}
}
} else if (sd_flag & STRHUP) {
error = stp->sd_werror;
}
return (error);
}
int
strstartplumb(stdata_t *stp, int flag, int cmd)
{
int waited = 1;
int error = 0;
if (STRMATED(stp)) {
struct stdata *stmatep = stp->sd_mate;
STRLOCKMATES(stp);
while (waited) {
waited = 0;
while (stmatep->sd_flag & (STWOPEN|STRCLOSE|STRPLUMB)) {
if ((cmd == I_POP) &&
(flag & (FNDELAY|FNONBLOCK))) {
STRUNLOCKMATES(stp);
return (EAGAIN);
}
waited = 1;
mutex_exit(&stp->sd_lock);
if (!cv_wait_sig(&stmatep->sd_monitor,
&stmatep->sd_lock)) {
mutex_exit(&stmatep->sd_lock);
return (EINTR);
}
mutex_exit(&stmatep->sd_lock);
STRLOCKMATES(stp);
}
while (stp->sd_flag & (STWOPEN|STRCLOSE|STRPLUMB)) {
if ((cmd == I_POP) &&
(flag & (FNDELAY|FNONBLOCK))) {
STRUNLOCKMATES(stp);
return (EAGAIN);
}
waited = 1;
mutex_exit(&stmatep->sd_lock);
if (!cv_wait_sig(&stp->sd_monitor,
&stp->sd_lock)) {
mutex_exit(&stp->sd_lock);
return (EINTR);
}
mutex_exit(&stp->sd_lock);
STRLOCKMATES(stp);
}
if (stp->sd_flag & (STRDERR|STWRERR|STRHUP|STPLEX)) {
error = strgeterr(stp,
STRDERR|STWRERR|STRHUP|STPLEX, 0);
if (error != 0) {
STRUNLOCKMATES(stp);
return (error);
}
}
}
stp->sd_flag |= STRPLUMB;
STRUNLOCKMATES(stp);
} else {
mutex_enter(&stp->sd_lock);
while (stp->sd_flag & (STWOPEN|STRCLOSE|STRPLUMB)) {
if (((cmd == I_POP) || (cmd == _I_REMOVE)) &&
(flag & (FNDELAY|FNONBLOCK))) {
mutex_exit(&stp->sd_lock);
return (EAGAIN);
}
if (!cv_wait_sig(&stp->sd_monitor, &stp->sd_lock)) {
mutex_exit(&stp->sd_lock);
return (EINTR);
}
if (stp->sd_flag & (STRDERR|STWRERR|STRHUP|STPLEX)) {
error = strgeterr(stp,
STRDERR|STWRERR|STRHUP|STPLEX, 0);
if (error != 0) {
mutex_exit(&stp->sd_lock);
return (error);
}
}
}
stp->sd_flag |= STRPLUMB;
mutex_exit(&stp->sd_lock);
}
return (0);
}
void
strendplumb(stdata_t *stp)
{
ASSERT(MUTEX_HELD(&stp->sd_lock));
ASSERT(stp->sd_flag & STRPLUMB);
stp->sd_flag &= ~STRPLUMB;
cv_broadcast(&stp->sd_monitor);
}
static void
strlock(struct stdata *stp, sqlist_t *sqlist)
{
syncql_t *sql, *sql2;
retry:
if (STRMATED(stp)) {
struct stdata *stp1, *stp2;
STRLOCKMATES(stp);
if (&(stp->sd_lock) > &((stp->sd_mate)->sd_lock)) {
stp1 = stp;
stp2 = stp->sd_mate;
} else {
stp2 = stp;
stp1 = stp->sd_mate;
}
mutex_enter(&stp1->sd_reflock);
if (stp1->sd_refcnt > 0) {
STRUNLOCKMATES(stp);
cv_wait(&stp1->sd_refmonitor, &stp1->sd_reflock);
mutex_exit(&stp1->sd_reflock);
goto retry;
}
mutex_enter(&stp2->sd_reflock);
if (stp2->sd_refcnt > 0) {
STRUNLOCKMATES(stp);
mutex_exit(&stp1->sd_reflock);
cv_wait(&stp2->sd_refmonitor, &stp2->sd_reflock);
mutex_exit(&stp2->sd_reflock);
goto retry;
}
STREAM_PUTLOCKS_ENTER(stp1);
STREAM_PUTLOCKS_ENTER(stp2);
} else {
mutex_enter(&stp->sd_lock);
mutex_enter(&stp->sd_reflock);
while (stp->sd_refcnt > 0) {
mutex_exit(&stp->sd_lock);
cv_wait(&stp->sd_refmonitor, &stp->sd_reflock);
if (mutex_tryenter(&stp->sd_lock) == 0) {
mutex_exit(&stp->sd_reflock);
mutex_enter(&stp->sd_lock);
mutex_enter(&stp->sd_reflock);
}
}
STREAM_PUTLOCKS_ENTER(stp);
}
if (sqlist == NULL)
return;
for (sql = sqlist->sqlist_head; sql; sql = sql->sql_next) {
syncq_t *sq = sql->sql_sq;
uint16_t count;
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
ASSERT(sq->sq_rmqcount <= count);
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
if (count == sq->sq_rmqcount)
continue;
if (STRMATED(stp)) {
STREAM_PUTLOCKS_EXIT(stp);
STREAM_PUTLOCKS_EXIT(stp->sd_mate);
STRUNLOCKMATES(stp);
mutex_exit(&stp->sd_reflock);
mutex_exit(&stp->sd_mate->sd_reflock);
} else {
STREAM_PUTLOCKS_EXIT(stp);
mutex_exit(&stp->sd_lock);
mutex_exit(&stp->sd_reflock);
}
for (sql2 = sqlist->sqlist_head; sql2 != sql;
sql2 = sql2->sql_next) {
SQ_PUTLOCKS_EXIT(sql2->sql_sq);
mutex_exit(SQLOCK(sql2->sql_sq));
}
sq->sq_needexcl++;
SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
while (count > sq->sq_rmqcount) {
sq->sq_flags |= SQ_WANTWAKEUP;
SQ_PUTLOCKS_EXIT(sq);
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
sq->sq_needexcl--;
if (sq->sq_needexcl == 0)
SQ_PUTCOUNT_SETFAST_LOCKED(sq);
SQ_PUTLOCKS_EXIT(sq);
ASSERT(count == sq->sq_rmqcount);
mutex_exit(SQLOCK(sq));
goto retry;
}
}
static void
strunlock(struct stdata *stp, sqlist_t *sqlist)
{
syncql_t *sql;
if (STRMATED(stp)) {
STREAM_PUTLOCKS_EXIT(stp);
STREAM_PUTLOCKS_EXIT(stp->sd_mate);
STRUNLOCKMATES(stp);
mutex_exit(&stp->sd_reflock);
mutex_exit(&stp->sd_mate->sd_reflock);
} else {
STREAM_PUTLOCKS_EXIT(stp);
mutex_exit(&stp->sd_lock);
mutex_exit(&stp->sd_reflock);
}
if (sqlist == NULL)
return;
for (sql = sqlist->sqlist_head; sql; sql = sql->sql_next) {
SQ_PUTLOCKS_EXIT(sql->sql_sq);
mutex_exit(SQLOCK(sql->sql_sq));
}
}
static void
backenable_insertedq(queue_t *q)
{
qband_t *qbp;
claimstr(q);
if (q->q_qinfo->qi_srvp != NULL && q->q_next != NULL) {
if (q->q_next->q_nfsrv->q_flag & QWANTW)
backenable(q, 0);
qbp = q->q_next->q_nfsrv->q_bandp;
for (; qbp != NULL; qbp = qbp->qb_next)
if ((qbp->qb_flag & QB_WANTW) && qbp->qb_first != NULL)
backenable(q, qbp->qb_first->b_band);
}
releasestr(q);
}
void
insertq(struct stdata *stp, queue_t *new)
{
queue_t *after;
queue_t *wafter;
queue_t *wnew = _WR(new);
boolean_t have_fifo = B_FALSE;
if (new->q_flag & _QINSERTING) {
ASSERT(stp->sd_vnode->v_type != VFIFO);
after = new->q_next;
wafter = _WR(new->q_next);
} else {
after = _RD(stp->sd_wrq);
wafter = stp->sd_wrq;
}
TRACE_2(TR_FAC_STREAMS_FR, TR_INSERTQ,
"insertq:%p, %p", after, new);
ASSERT(after->q_flag & QREADR);
ASSERT(new->q_flag & QREADR);
strlock(stp, NULL);
if (wafter->q_next == after) {
have_fifo = B_TRUE;
wnew->q_next = new;
} else {
wnew->q_next = wafter->q_next;
}
new->q_next = after;
set_nfsrv_ptr(new, wnew, after, wafter);
new->q_flag &= ~_QINSERTING;
if (have_fifo) {
wafter->q_next = wnew;
} else {
if (wafter->q_next)
_OTHERQ(wafter->q_next)->q_next = new;
wafter->q_next = wnew;
}
set_qend(new);
set_qend(after);
ASSERT(_SAMESTR(new) == O_SAMESTR(new));
ASSERT(_SAMESTR(wnew) == O_SAMESTR(wnew));
ASSERT(_SAMESTR(after) == O_SAMESTR(after));
ASSERT(_SAMESTR(wafter) == O_SAMESTR(wafter));
strsetuio(stp);
if (!(new->q_flag & QISDRV))
stp->sd_pushcnt++;
strunlock(stp, NULL);
backenable_insertedq(wnew);
backenable_insertedq(new);
}
void
removeq(queue_t *qp)
{
queue_t *wqp = _WR(qp);
struct stdata *stp = STREAM(qp);
sqlist_t *sqlist = NULL;
boolean_t isdriver;
int moved;
syncq_t *sq = qp->q_syncq;
syncq_t *wsq = wqp->q_syncq;
ASSERT(stp);
TRACE_2(TR_FAC_STREAMS_FR, TR_REMOVEQ,
"removeq:%p %p", qp, wqp);
ASSERT(qp->q_flag&QREADR);
if (qp->q_flag & QSYNCSTR) {
mutex_enter(SQLOCK(sq));
if (sq->sq_flags & SQ_WANTWAKEUP) {
sq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
mutex_exit(SQLOCK(sq));
if (wsq != sq) {
mutex_enter(SQLOCK(wsq));
if (wsq->sq_flags & SQ_WANTWAKEUP) {
wsq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&wsq->sq_wait);
}
mutex_exit(SQLOCK(wsq));
}
mutex_enter(QLOCK(qp));
while (qp->q_rwcnt > 0) {
qp->q_flag |= QWANTRMQSYNC;
cv_wait(&qp->q_wait, QLOCK(qp));
}
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
while (wqp->q_rwcnt > 0) {
wqp->q_flag |= QWANTRMQSYNC;
cv_wait(&wqp->q_wait, QLOCK(wqp));
}
mutex_exit(QLOCK(wqp));
}
mutex_enter(SQLOCK(sq));
sq->sq_rmqcount++;
if (sq->sq_flags & SQ_WANTWAKEUP) {
sq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
mutex_exit(SQLOCK(sq));
isdriver = (qp->q_flag & QISDRV);
sqlist = sqlist_build(qp, stp, STRMATED(stp));
strlock(stp, sqlist);
reset_nfsrv_ptr(qp, wqp);
ASSERT(wqp->q_next == NULL || backq(qp)->q_next == qp);
ASSERT(qp->q_next == NULL || backq(wqp)->q_next == wqp);
if (wqp->q_next == qp) {
stp->sd_wrq->q_next = _RD(stp->sd_wrq);
} else {
if (wqp->q_next)
backq(qp)->q_next = qp->q_next;
if (qp->q_next)
backq(wqp)->q_next = wqp->q_next;
}
if (qp->q_next)
set_qend(qp->q_next);
ASSERT(_SAMESTR(stp->sd_wrq) == O_SAMESTR(stp->sd_wrq));
ASSERT(_SAMESTR(_RD(stp->sd_wrq)) == O_SAMESTR(_RD(stp->sd_wrq)));
moved = 0;
if (qp->q_syncqmsgs != 0 || (qp->q_syncq->sq_flags & SQ_EVENTS))
moved += propagate_syncq(qp);
if (wqp->q_syncqmsgs != 0 ||
(wqp->q_syncq->sq_flags & SQ_EVENTS))
moved += propagate_syncq(wqp);
strsetuio(stp);
if (!isdriver)
stp->sd_pushcnt--;
strunlock(stp, sqlist);
sqlist_free(sqlist);
if (qp->q_next != NULL) {
clr_qfull(qp);
if (!isdriver && (moved > 0))
emptysq(qp->q_next->q_syncq);
}
if (wqp->q_next != NULL) {
clr_qfull(wqp);
if (moved > 0)
emptysq(wqp->q_next->q_syncq);
}
mutex_enter(SQLOCK(sq));
sq->sq_rmqcount--;
mutex_exit(SQLOCK(sq));
}
static void
blocksq(syncq_t *sq, ushort_t flag, int maxcnt)
{
uint16_t count = 0;
mutex_enter(SQLOCK(sq));
if (maxcnt != -1) {
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
sq->sq_needexcl++;
ASSERT(sq->sq_needexcl != 0);
while ((sq->sq_flags & flag) ||
(maxcnt != -1 && count > (unsigned)maxcnt)) {
sq->sq_flags |= SQ_WANTWAKEUP;
if (maxcnt != -1) {
SQ_PUTLOCKS_EXIT(sq);
}
cv_wait(&sq->sq_wait, SQLOCK(sq));
if (maxcnt != -1) {
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
}
sq->sq_needexcl--;
sq->sq_flags |= flag;
ASSERT(maxcnt == -1 || count == maxcnt);
if (maxcnt != -1) {
if (sq->sq_needexcl == 0) {
SQ_PUTCOUNT_SETFAST_LOCKED(sq);
}
SQ_PUTLOCKS_EXIT(sq);
} else if (sq->sq_needexcl == 0) {
SQ_PUTCOUNT_SETFAST(sq);
}
mutex_exit(SQLOCK(sq));
}
static void
unblocksq(syncq_t *sq, uint16_t resetflag, int isouter)
{
uint16_t flags;
mutex_enter(SQLOCK(sq));
ASSERT(resetflag != SQ_WRITER);
ASSERT(sq->sq_flags & resetflag);
flags = sq->sq_flags & ~resetflag;
sq->sq_flags = flags;
if (flags & (SQ_QUEUED | SQ_WANTWAKEUP)) {
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
sq->sq_flags = flags;
if ((flags & SQ_QUEUED) && !(flags & (SQ_STAYAWAY|SQ_EXCL))) {
if (!isouter) {
drain_syncq(sq);
return;
}
}
}
mutex_exit(SQLOCK(sq));
}
static int
dropsq(syncq_t *sq, uint16_t resetflag)
{
uint16_t flags;
mutex_enter(SQLOCK(sq));
ASSERT(sq->sq_flags & resetflag);
flags = sq->sq_flags & ~resetflag;
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
sq->sq_flags = flags;
mutex_exit(SQLOCK(sq));
if (flags & SQ_QUEUED)
return (1);
return (0);
}
static void
emptysq(syncq_t *sq)
{
uint16_t flags;
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
if ((flags & SQ_QUEUED) && !(flags & (SQ_STAYAWAY|SQ_EXCL))) {
if (sq->sq_count == 0) {
drain_syncq(sq);
return;
} else
sqenable(sq);
}
mutex_exit(SQLOCK(sq));
}
static void
sqlist_insert(sqlist_t *sqlist, syncq_t *sqp)
{
syncql_t *sqlp, **prev_sqlpp, *new_sqlp;
prev_sqlpp = &sqlist->sqlist_head;
while ((sqlp = *prev_sqlpp) != NULL) {
if (sqlp->sql_sq >= sqp) {
if (sqlp->sql_sq == sqp)
return;
break;
}
prev_sqlpp = &sqlp->sql_next;
}
new_sqlp = &sqlist->sqlist_array[sqlist->sqlist_index++];
ASSERT((char *)new_sqlp < (char *)sqlist + sqlist->sqlist_size);
new_sqlp->sql_next = sqlp;
new_sqlp->sql_sq = sqp;
*prev_sqlpp = new_sqlp;
}
static void
sqlist_insertall(sqlist_t *sqlist, queue_t *q)
{
while (q != NULL) {
sqlist_insert(sqlist, q->q_syncq);
if (_SAMESTR(q))
q = q->q_next;
else if (!(q->q_flag & QREADR))
q = _RD(q);
else
q = NULL;
}
}
static sqlist_t *
sqlist_build(queue_t *q, struct stdata *stp, boolean_t do_twist)
{
sqlist_t *sqlist = sqlist_alloc(stp, KM_SLEEP);
ASSERT(q->q_flag & QREADR);
sqlist_insert(sqlist, q->q_syncq);
sqlist_insert(sqlist, _WR(q)->q_syncq);
sqlist_insertall(sqlist, stp->sd_wrq);
if (do_twist)
sqlist_insertall(sqlist, stp->sd_mate->sd_wrq);
return (sqlist);
}
static sqlist_t *
sqlist_alloc(struct stdata *stp, int kmflag)
{
size_t sqlist_size;
sqlist_t *sqlist;
sqlist_size = 2 * sizeof (syncql_t) * stp->sd_pushcnt +
sizeof (sqlist_t);
if (STRMATED(stp))
sqlist_size += 2 * sizeof (syncql_t) * stp->sd_mate->sd_pushcnt;
sqlist = kmem_alloc(sqlist_size, kmflag);
sqlist->sqlist_head = NULL;
sqlist->sqlist_size = sqlist_size;
sqlist->sqlist_index = 0;
return (sqlist);
}
static void
sqlist_free(sqlist_t *sqlist)
{
kmem_free(sqlist, sqlist->sqlist_size);
}
void
strblock(queue_t *q)
{
struct stdata *stp;
syncql_t *sql;
sqlist_t *sqlist;
q = _RD(q);
stp = STREAM(q);
ASSERT(stp != NULL);
sqlist = sqlist_build(q, stp, B_FALSE);
for (sql = sqlist->sqlist_head; sql != NULL; sql = sql->sql_next)
blocksq(sql->sql_sq, SQ_FROZEN, -1);
sqlist_free(sqlist);
}
void
strunblock(queue_t *q)
{
struct stdata *stp;
syncql_t *sql;
sqlist_t *sqlist;
int drain_needed;
q = _RD(q);
stp = STREAM(q);
ASSERT(stp != NULL);
sqlist = sqlist_build(q, stp, B_FALSE);
drain_needed = 0;
for (sql = sqlist->sqlist_head; sql != NULL; sql = sql->sql_next)
drain_needed += dropsq(sql->sql_sq, SQ_FROZEN);
if (drain_needed) {
for (sql = sqlist->sqlist_head; sql != NULL;
sql = sql->sql_next)
emptysq(sql->sql_sq);
}
sqlist_free(sqlist);
}
#ifdef DEBUG
static int
qprocsareon(queue_t *rq)
{
if (rq->q_next == NULL)
return (0);
return (_WR(rq->q_next)->q_next == _WR(rq));
}
int
qclaimed(queue_t *q)
{
uint_t count;
count = q->q_syncq->sq_count;
SUM_SQ_PUTCOUNTS(q->q_syncq, count);
return (count != 0);
}
int
frozenstr(queue_t *q)
{
return ((q->q_syncq->sq_flags & SQ_FROZEN) != 0);
}
#endif
void
enterq(queue_t *q)
{
entersq(q->q_syncq, SQ_CALLBACK);
}
void
leaveq(queue_t *q)
{
leavesq(q->q_syncq, SQ_CALLBACK);
}
void
entersq(syncq_t *sq, int entrypoint)
{
uint16_t count = 0;
uint16_t flags;
uint16_t waitflags = SQ_STAYAWAY | SQ_EVENTS | SQ_EXCL;
uint16_t type;
uint_t c_inner = entrypoint & SQ_CI;
uint_t c_outer = entrypoint & SQ_CO;
ASSERT(sq);
ASSERT(c_inner && c_outer);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
type = sq->sq_type;
if (!(type & c_inner)) {
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
SUM_SQ_PUTCOUNTS(sq, count);
sq->sq_needexcl++;
ASSERT(sq->sq_needexcl != 0);
waitflags |= SQ_MESSAGES;
}
while ((flags & waitflags) || (!(type & c_inner) && count != 0)) {
sq->sq_flags = flags | SQ_WANTWAKEUP;
if (!(type & c_inner)) {
SQ_PUTLOCKS_EXIT(sq);
}
cv_wait(&sq->sq_wait, SQLOCK(sq));
if (!(type & c_inner)) {
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
flags = sq->sq_flags;
}
if (!(type & c_inner)) {
ASSERT(sq->sq_needexcl > 0);
sq->sq_needexcl--;
if (sq->sq_needexcl == 0) {
SQ_PUTCOUNT_SETFAST_LOCKED(sq);
}
}
if (!(type & c_outer)) {
if (!(type & c_inner)) {
SQ_PUTLOCKS_EXIT(sq);
}
mutex_exit(SQLOCK(sq));
outer_enter(sq->sq_outer, SQ_GOAWAY);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
count = sq->sq_count;
#ifdef DEBUG
SUMCHECK_SQ_PUTCOUNTS(sq, 0);
#endif
while ((flags & (SQ_EXCL|SQ_BLOCKED|SQ_FROZEN)) ||
(!(type & c_inner) && count != 0)) {
sq->sq_flags = flags | SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
flags = sq->sq_flags;
}
}
sq->sq_count++;
ASSERT(sq->sq_count != 0);
if (!(type & c_inner)) {
ASSERT(sq->sq_count == 1);
sq->sq_flags |= SQ_EXCL;
if (type & c_outer) {
SQ_PUTLOCKS_EXIT(sq);
}
}
mutex_exit(SQLOCK(sq));
}
void
leavesq(syncq_t *sq, int entrypoint)
{
uint16_t flags;
uint16_t type;
uint_t c_outer = entrypoint & SQ_CO;
#ifdef DEBUG
uint_t c_inner = entrypoint & SQ_CI;
#endif
ASSERT(sq);
ASSERT(c_inner && c_outer);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
type = sq->sq_type;
if (flags & (SQ_QUEUED|SQ_WANTWAKEUP|SQ_WANTEXWAKEUP)) {
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
if (flags & SQ_WANTEXWAKEUP) {
flags &= ~SQ_WANTEXWAKEUP;
cv_broadcast(&sq->sq_exitwait);
}
if ((flags & SQ_QUEUED) && !(flags & SQ_STAYAWAY)) {
ASSERT(sq->sq_count != 0);
sq->sq_count--;
ASSERT((flags & SQ_EXCL) || (type & c_inner));
sq->sq_flags = flags & ~SQ_EXCL;
drain_syncq(sq);
ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
if (!(type & c_outer))
outer_exit(sq->sq_outer);
return;
}
}
ASSERT(sq->sq_count != 0);
sq->sq_count--;
ASSERT((flags & SQ_EXCL) || (type & c_inner));
sq->sq_flags = flags & ~SQ_EXCL;
mutex_exit(SQLOCK(sq));
if (!(sq->sq_type & c_outer))
outer_exit(sq->sq_outer);
}
void
claimq(queue_t *qp)
{
syncq_t *sq = qp->q_syncq;
mutex_enter(SQLOCK(sq));
sq->sq_count++;
ASSERT(sq->sq_count != 0);
mutex_exit(SQLOCK(sq));
}
void
releaseq(queue_t *qp)
{
syncq_t *sq = qp->q_syncq;
uint16_t flags;
mutex_enter(SQLOCK(sq));
ASSERT(sq->sq_count > 0);
sq->sq_count--;
flags = sq->sq_flags;
if (flags & (SQ_WANTWAKEUP|SQ_QUEUED)) {
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
sq->sq_flags = flags;
if ((flags & SQ_QUEUED) && !(flags & (SQ_STAYAWAY|SQ_EXCL))) {
if (sq->sq_count == 0) {
drain_syncq(sq);
return;
} else
sqenable(sq);
}
}
mutex_exit(SQLOCK(sq));
}
void
claimstr(queue_t *qp)
{
struct stdata *stp = STREAM(qp);
mutex_enter(&stp->sd_reflock);
stp->sd_refcnt++;
ASSERT(stp->sd_refcnt != 0);
mutex_exit(&stp->sd_reflock);
}
void
releasestr(queue_t *qp)
{
struct stdata *stp = STREAM(qp);
mutex_enter(&stp->sd_reflock);
ASSERT(stp->sd_refcnt != 0);
if (--stp->sd_refcnt == 0)
cv_broadcast(&stp->sd_refmonitor);
mutex_exit(&stp->sd_reflock);
}
static syncq_t *
new_syncq(void)
{
return (kmem_cache_alloc(syncq_cache, KM_SLEEP));
}
static void
free_syncq(syncq_t *sq)
{
ASSERT(sq->sq_head == NULL);
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_callbpend == NULL);
ASSERT((sq->sq_onext == NULL && sq->sq_oprev == NULL) ||
(sq->sq_onext == sq && sq->sq_oprev == sq));
if (sq->sq_ciputctrl != NULL) {
ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(sq->sq_ciputctrl,
sq->sq_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, sq->sq_ciputctrl);
}
sq->sq_tail = NULL;
sq->sq_evhead = NULL;
sq->sq_evtail = NULL;
sq->sq_ciputctrl = NULL;
sq->sq_nciputctrl = 0;
sq->sq_count = 0;
sq->sq_rmqcount = 0;
sq->sq_callbflags = 0;
sq->sq_cancelid = 0;
sq->sq_next = NULL;
sq->sq_needexcl = 0;
sq->sq_svcflags = 0;
sq->sq_nqueues = 0;
sq->sq_pri = 0;
sq->sq_onext = NULL;
sq->sq_oprev = NULL;
sq->sq_flags = 0;
sq->sq_type = 0;
sq->sq_servcount = 0;
kmem_cache_free(syncq_cache, sq);
}
void
outer_enter(syncq_t *outer, uint16_t flags)
{
syncq_t *sq;
int wait_needed;
uint16_t count;
ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
outer->sq_oprev != NULL);
ASSERT(flags & SQ_WRITER);
retry:
mutex_enter(SQLOCK(outer));
while (outer->sq_flags & flags) {
outer->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&outer->sq_wait, SQLOCK(outer));
}
ASSERT(!(outer->sq_flags & SQ_WRITER));
outer->sq_flags |= SQ_WRITER;
outer->sq_count++;
ASSERT(outer->sq_count != 0);
wait_needed = 0;
for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext) {
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
sq->sq_flags |= SQ_WRITER;
SUM_SQ_PUTCOUNTS(sq, count);
if (count != 0)
wait_needed = 1;
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
}
mutex_exit(SQLOCK(outer));
if (wait_needed) {
for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext) {
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
while (count != 0) {
sq->sq_flags |= SQ_WANTWAKEUP;
SQ_PUTLOCKS_EXIT(sq);
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
}
mutex_enter(SQLOCK(outer));
if (outer->sq_flags & (flags & ~SQ_WRITER)) {
mutex_exit(SQLOCK(outer));
outer_exit(outer);
goto retry;
}
mutex_exit(SQLOCK(outer));
}
}
void
outer_exit(syncq_t *outer)
{
syncq_t *sq;
int drain_needed;
uint16_t flags;
ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
outer->sq_oprev != NULL);
ASSERT(MUTEX_NOT_HELD(SQLOCK(outer)));
mutex_enter(SQLOCK(outer));
flags = outer->sq_flags;
ASSERT(outer->sq_flags & SQ_WRITER);
if (flags & SQ_QUEUED) {
write_now(outer);
flags = outer->sq_flags;
}
drain_needed = 0;
for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext)
drain_needed += dropsq(sq, SQ_WRITER);
ASSERT(!(outer->sq_flags & SQ_QUEUED));
flags &= ~SQ_WRITER;
if (drain_needed) {
outer->sq_flags = flags;
mutex_exit(SQLOCK(outer));
for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext)
emptysq(sq);
mutex_enter(SQLOCK(outer));
flags = outer->sq_flags;
}
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&outer->sq_wait);
}
outer->sq_flags = flags;
ASSERT(outer->sq_count > 0);
outer->sq_count--;
mutex_exit(SQLOCK(outer));
}
static void
outer_insert(syncq_t *outer, syncq_t *sq)
{
ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
outer->sq_oprev != NULL);
ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL);
blocksq(outer, SQ_BLOCKED, 0);
ASSERT(outer->sq_flags & SQ_BLOCKED);
ASSERT(!(outer->sq_flags & SQ_WRITER));
mutex_enter(SQLOCK(sq));
sq->sq_outer = outer;
outer->sq_onext->sq_oprev = sq;
sq->sq_onext = outer->sq_onext;
outer->sq_onext = sq;
sq->sq_oprev = outer;
mutex_exit(SQLOCK(sq));
unblocksq(outer, SQ_BLOCKED, 1);
}
static void
outer_remove(syncq_t *outer, syncq_t *sq)
{
ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
outer->sq_oprev != NULL);
ASSERT(sq->sq_outer == outer);
blocksq(outer, SQ_BLOCKED, 0);
ASSERT(outer->sq_flags & SQ_BLOCKED);
ASSERT(!(outer->sq_flags & SQ_WRITER));
mutex_enter(SQLOCK(sq));
sq->sq_outer = NULL;
sq->sq_onext->sq_oprev = sq->sq_oprev;
sq->sq_oprev->sq_onext = sq->sq_onext;
sq->sq_oprev = sq->sq_onext = NULL;
mutex_exit(SQLOCK(sq));
unblocksq(outer, SQ_BLOCKED, 1);
}
static void
queue_writer(syncq_t *outer, void (*func)(), queue_t *q, mblk_t *mp)
{
ASSERT(MUTEX_HELD(SQLOCK(outer)));
mp->b_prev = (mblk_t *)func;
mp->b_queue = q;
mp->b_next = NULL;
outer->sq_count++;
ASSERT(outer->sq_count != 0);
if (outer->sq_evhead == NULL) {
outer->sq_evhead = outer->sq_evtail = mp;
outer->sq_flags |= SQ_EVENTS;
mutex_exit(SQLOCK(outer));
STRSTAT(qwr_outer);
(void) taskq_dispatch(streams_taskq,
(task_func_t *)qwriter_outer_service, outer, TQ_SLEEP);
} else {
ASSERT(outer->sq_flags & SQ_EVENTS);
outer->sq_evtail->b_next = mp;
outer->sq_evtail = mp;
mutex_exit(SQLOCK(outer));
}
}
void
qwriter_outer(queue_t *q, mblk_t *mp, void (*func)())
{
syncq_t *osq, *sq, *outer;
int failed;
uint16_t flags;
osq = q->q_syncq;
outer = osq->sq_outer;
if (outer == NULL)
panic("qwriter(PERIM_OUTER): no outer perimeter");
ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
outer->sq_oprev != NULL);
mutex_enter(SQLOCK(outer));
flags = outer->sq_flags;
if ((flags & SQ_GOAWAY) || (curthread->t_pri >= kpreemptpri)) {
if (flags & SQ_BLOCKED) {
mutex_enter(SQLOCK(osq));
osq->sq_flags |= SQ_WRITER;
mutex_exit(SQLOCK(osq));
} else {
if (!(flags & SQ_WRITER)) {
mutex_enter(SQLOCK(osq));
osq->sq_flags |= SQ_WRITER;
mutex_exit(SQLOCK(osq));
}
ASSERT(osq->sq_flags & SQ_WRITER);
}
queue_writer(outer, func, q, mp);
return;
}
outer->sq_count++;
ASSERT(outer->sq_count != 0);
flags |= SQ_WRITER;
failed = 0;
for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext) {
uint16_t count;
uint_t maxcnt = (sq == osq) ? 1 : 0;
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
if (sq->sq_count > maxcnt)
failed = 1;
sq->sq_flags |= SQ_WRITER;
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
}
if (failed) {
outer->sq_flags = flags;
queue_writer(outer, func, q, mp);
mutex_enter(SQLOCK(outer));
ASSERT(outer->sq_count > 0);
outer->sq_count--;
ASSERT(outer->sq_flags & SQ_WRITER);
flags = outer->sq_flags;
flags &= ~SQ_WRITER;
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&outer->sq_wait);
}
outer->sq_flags = flags;
mutex_exit(SQLOCK(outer));
return;
} else {
outer->sq_flags = flags;
mutex_exit(SQLOCK(outer));
}
(*func)(q, mp);
outer_exit(outer);
}
static void
write_now(syncq_t *outer)
{
mblk_t *mp;
queue_t *q;
void (*func)();
ASSERT(MUTEX_HELD(SQLOCK(outer)));
ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
outer->sq_oprev != NULL);
while ((mp = outer->sq_evhead) != NULL) {
ASSERT(!(outer->sq_flags & SQ_MESSAGES));
ASSERT((outer->sq_flags & SQ_EVENTS));
outer->sq_evhead = mp->b_next;
if (outer->sq_evhead == NULL) {
outer->sq_evtail = NULL;
outer->sq_flags &= ~SQ_EVENTS;
}
ASSERT(outer->sq_count != 0);
outer->sq_count--;
mutex_exit(SQLOCK(outer));
q = mp->b_queue;
func = (void (*)())mp->b_prev;
ASSERT(func != NULL);
mp->b_next = mp->b_prev = NULL;
if (q->q_flag & QWCLOSE) {
freemsg(mp);
} else {
claimq(q);
(*func)(q, mp);
releaseq(q);
}
mutex_enter(SQLOCK(outer));
}
ASSERT(MUTEX_HELD(SQLOCK(outer)));
}
void
sq_run_events(syncq_t *sq)
{
mblk_t *bp;
queue_t *qp;
uint16_t flags = sq->sq_flags;
void (*func)();
ASSERT(MUTEX_HELD(SQLOCK(sq)));
ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL) ||
(sq->sq_outer != NULL && sq->sq_onext != NULL &&
sq->sq_oprev != NULL));
ASSERT(flags & SQ_EXCL);
ASSERT(sq->sq_count == 1);
for (bp = sq->sq_evhead; bp != NULL; bp = sq->sq_evhead) {
ASSERT(bp->b_queue->q_syncq == sq);
ASSERT(sq->sq_flags & SQ_EVENTS);
qp = bp->b_queue;
func = (void (*)())bp->b_prev;
ASSERT(func != NULL);
ASSERT(sq->sq_evhead == bp);
sq->sq_evhead = bp->b_next;
if (bp->b_next == NULL) {
ASSERT(sq->sq_evtail == bp);
sq->sq_evtail = NULL;
sq->sq_flags &= ~SQ_EVENTS;
}
bp->b_prev = bp->b_next = NULL;
ASSERT(bp->b_datap->db_ref != 0);
mutex_exit(SQLOCK(sq));
(*func)(qp, bp);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
ASSERT(flags & SQ_EXCL);
}
ASSERT(sq->sq_evhead == NULL && sq->sq_evtail == NULL);
ASSERT(!(sq->sq_flags & SQ_EVENTS));
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
if (flags & SQ_WANTEXWAKEUP) {
flags &= ~SQ_WANTEXWAKEUP;
cv_broadcast(&sq->sq_exitwait);
}
sq->sq_flags = flags;
}
static void
sqfill_events(syncq_t *sq, queue_t *q, mblk_t *mp, void (*func)())
{
uint16_t count;
ASSERT(MUTEX_HELD(SQLOCK(sq)));
ASSERT(func != NULL);
mp->b_prev = (mblk_t *)func;
mp->b_queue = q;
mp->b_next = NULL;
if (sq->sq_evhead == NULL) {
sq->sq_evhead = sq->sq_evtail = mp;
sq->sq_flags |= SQ_EVENTS;
} else {
ASSERT(sq->sq_evtail != NULL);
ASSERT(sq->sq_evtail->b_next == NULL);
ASSERT(sq->sq_flags & SQ_EVENTS);
sq->sq_evtail->b_next = mp;
sq->sq_evtail = mp;
}
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
if (count > 0) {
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
return;
}
ASSERT((sq->sq_flags & SQ_EXCL) == 0);
sq->sq_flags |= SQ_EXCL;
ASSERT(sq->sq_count == 0);
sq->sq_count++;
SQ_PUTLOCKS_EXIT(sq);
sq_run_events(sq);
sq->sq_count--;
sq->sq_flags &= ~SQ_EXCL;
ASSERT(!(sq->sq_flags & SQ_EXCL));
if (!(sq->sq_flags & SQ_STAYAWAY) && (sq->sq_flags & SQ_QUEUED))
drain_syncq(sq);
else
mutex_exit(SQLOCK(sq));
}
void
drain_syncq(syncq_t *sq)
{
queue_t *qp;
uint16_t count;
uint16_t type = sq->sq_type;
uint16_t flags = sq->sq_flags;
boolean_t bg_service = sq->sq_svcflags & SQ_SERVICE;
TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_START,
"drain_syncq start:%p", sq);
ASSERT(MUTEX_HELD(SQLOCK(sq)));
ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL) ||
(sq->sq_outer != NULL && sq->sq_onext != NULL &&
sq->sq_oprev != NULL));
if (bg_service)
sq->sq_svcflags &= ~SQ_SERVICE;
if (flags & SQ_EXCL) {
if (bg_service) {
ASSERT(sq->sq_servcount != 0);
sq->sq_servcount--;
}
mutex_exit(SQLOCK(sq));
return;
}
if (!(flags & SQ_QUEUED)) {
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
if (flags & SQ_WANTEXWAKEUP) {
flags &= ~SQ_WANTEXWAKEUP;
cv_broadcast(&sq->sq_exitwait);
}
sq->sq_flags = flags;
if (bg_service) {
ASSERT(sq->sq_servcount != 0);
sq->sq_servcount--;
}
mutex_exit(SQLOCK(sq));
return;
}
type = sq->sq_type;
if (!(type & SQ_CIPUT)) {
if (sq->sq_count > 1) {
if (bg_service) {
ASSERT(sq->sq_servcount != 0);
sq->sq_servcount--;
}
mutex_exit(SQLOCK(sq));
return;
}
sq->sq_flags |= SQ_EXCL;
}
sq->sq_count++;
ASSERT(sq->sq_count != 0);
while ((flags = sq->sq_flags) & SQ_QUEUED) {
if (flags & (SQ_STAYAWAY)) {
break;
}
if (sq->sq_evhead != NULL) {
ASSERT(sq->sq_flags & SQ_EVENTS);
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
if (count > 1) {
SQ_PUTLOCKS_EXIT(sq);
break;
}
ASSERT((flags & SQ_EXCL) == 0);
sq->sq_flags = flags | SQ_EXCL;
SQ_PUTLOCKS_EXIT(sq);
sq_run_events(sq);
if (type & SQ_CIPUT) {
ASSERT(sq->sq_flags & SQ_EXCL);
sq->sq_flags &= ~SQ_EXCL;
}
ASSERT((sq->sq_flags & SQ_EXCL) || (type & SQ_CIPUT));
continue;
}
ASSERT(sq->sq_evhead == NULL);
ASSERT(!(sq->sq_flags & SQ_EVENTS));
for (qp = sq->sq_head;
qp != NULL && (qp->q_draining ||
(qp->q_sqflags & Q_SQDRAINING));
qp = qp->q_sqnext)
;
if (qp == NULL)
break;
qp->q_sqflags |= Q_SQDRAINING;
mutex_exit(SQLOCK(sq));
mutex_enter(QLOCK(qp));
qdrain_syncq(sq, qp);
mutex_enter(SQLOCK(sq));
ASSERT(qp->q_sqflags & Q_SQDRAINING);
qp->q_sqflags &= ~Q_SQDRAINING;
}
ASSERT(MUTEX_HELD(SQLOCK(sq)));
flags = sq->sq_flags;
ASSERT((sq->sq_head == NULL) || (flags & SQ_GOAWAY) ||
(type & SQ_CI) || sq->sq_head->q_draining);
if (!(type & SQ_CIPUT))
flags &= ~SQ_EXCL;
ASSERT((flags & SQ_EXCL) == 0);
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
if (flags & SQ_WANTEXWAKEUP) {
flags &= ~SQ_WANTEXWAKEUP;
cv_broadcast(&sq->sq_exitwait);
}
sq->sq_flags = flags;
ASSERT(sq->sq_count != 0);
sq->sq_count--;
if (bg_service) {
ASSERT(sq->sq_servcount != 0);
sq->sq_servcount--;
}
mutex_exit(SQLOCK(sq));
TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_END,
"drain_syncq end:%p", sq);
}
void
qdrain_syncq(syncq_t *sq, queue_t *q)
{
mblk_t *bp;
#ifdef DEBUG
uint16_t count;
#endif
TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_START,
"drain_syncq start:%p", sq);
ASSERT(q->q_syncq == sq);
ASSERT(MUTEX_HELD(QLOCK(q)));
ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
ASSERT((sq->sq_flags & (SQ_EXCL|SQ_CIPUT)));
ASSERT(!((sq->sq_type & SQ_CIPUT) && (sq->sq_flags & SQ_EXCL)));
ASSERT((sq->sq_type & SQ_CIPUT) || (sq->sq_flags & SQ_EXCL));
ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL) ||
(sq->sq_outer != NULL && sq->sq_onext != NULL &&
sq->sq_oprev != NULL));
#ifdef DEBUG
count = sq->sq_count;
SUM_SQ_PUTCOUNTS(sq, count);
ASSERT(count >= 1);
#endif
if (q->q_draining) {
mutex_exit(QLOCK(q));
return;
}
q->q_draining = 1;
if (q->q_sqhead == NULL) {
ASSERT(q->q_syncqmsgs == 0);
mutex_exit(QLOCK(q));
clr_qfull(q);
mutex_enter(QLOCK(q));
}
for (bp = q->q_sqhead; bp != NULL; bp = q->q_sqhead) {
if (sq->sq_flags & (SQ_STAYAWAY | SQ_EVENTS)) {
break;
}
#ifdef DEBUG
ASSERT(bp->b_queue == q);
ASSERT(bp->b_queue->q_syncq == sq);
bp->b_queue = NULL;
#endif
ASSERT(q->q_sqhead == bp);
q->q_sqhead = bp->b_next;
bp->b_prev = bp->b_next = NULL;
ASSERT(q->q_syncqmsgs > 0);
mutex_exit(QLOCK(q));
ASSERT(bp->b_datap->db_ref != 0);
(void) (*q->q_qinfo->qi_putp)(q, bp);
mutex_enter(QLOCK(q));
if (--q->q_syncqmsgs == 0) {
mutex_exit(QLOCK(q));
clr_qfull(q);
mutex_enter(QLOCK(q));
}
if ((sq->sq_type & SQ_CIPUT) && (sq->sq_flags & SQ_EXCL)) {
mutex_enter(SQLOCK(sq));
sq->sq_flags &= ~SQ_EXCL;
mutex_exit(SQLOCK(sq));
}
}
ASSERT((q->q_sqhead == NULL) ||
(sq->sq_flags & (SQ_STAYAWAY | SQ_EVENTS)));
ASSERT(MUTEX_HELD(QLOCK(q)));
ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
if (q->q_sqhead == NULL) {
ASSERT(q->q_syncqmsgs == 0);
mutex_enter(SQLOCK(sq));
if (q->q_sqflags & Q_SQQUEUED)
SQRM_Q(sq, q);
mutex_exit(SQLOCK(sq));
q->q_spri = 0;
}
ASSERT(q->q_draining);
q->q_draining = 0;
mutex_exit(QLOCK(q));
TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_END,
"drain_syncq end:%p", sq);
}
void
qfill_syncq(syncq_t *sq, queue_t *q, mblk_t *mp)
{
ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
ASSERT(MUTEX_NOT_HELD(QLOCK(q)));
ASSERT(sq->sq_count > 0);
ASSERT(q->q_syncq == sq);
ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL) ||
(sq->sq_outer != NULL && sq->sq_onext != NULL &&
sq->sq_oprev != NULL));
mutex_enter(QLOCK(q));
#ifdef DEBUG
mp->b_prev = (mblk_t *)q->q_qinfo->qi_putp;
mp->b_queue = q;
mp->b_next = NULL;
#endif
ASSERT(q->q_syncq == sq);
SQPUT_MP(q, mp);
mutex_enter(SQLOCK(sq));
if (!(q->q_sqflags & Q_SQQUEUED)) {
q->q_spri = curthread->t_pri;
SQPUT_Q(sq, q);
}
#ifdef DEBUG
else {
ASSERT(sq->sq_tail != NULL);
if (sq->sq_tail == sq->sq_head) {
ASSERT((q->q_sqprev == NULL) &&
(q->q_sqnext == NULL));
} else {
ASSERT((q->q_sqprev != NULL) ||
(q->q_sqnext != NULL));
}
ASSERT(sq->sq_flags & SQ_QUEUED);
ASSERT(q->q_syncqmsgs != 0);
ASSERT(q->q_sqflags & Q_SQQUEUED);
}
#endif
mutex_exit(QLOCK(q));
sq->sq_count--;
putnext_tail(sq, q, 0);
}
int
flush_syncq(syncq_t *sq, queue_t *qp)
{
mblk_t *bp, *mp_head, *mp_next, *mp_prev;
queue_t *q;
int ret = 0;
mutex_enter(SQLOCK(sq));
if (qp != NULL && sq->sq_evhead != NULL) {
ASSERT(sq->sq_flags & SQ_EVENTS);
mp_prev = NULL;
for (bp = sq->sq_evhead; bp != NULL; bp = mp_next) {
mp_next = bp->b_next;
if (bp->b_queue == qp) {
if (mp_prev != NULL) {
mp_prev->b_next = mp_next;
if (bp == sq->sq_evtail) {
ASSERT(mp_next == NULL);
sq->sq_evtail = mp_prev;
}
} else
sq->sq_evhead = mp_next;
if (sq->sq_evhead == NULL)
sq->sq_flags &= ~SQ_EVENTS;
bp->b_prev = bp->b_next = NULL;
freemsg(bp);
ret++;
} else {
mp_prev = bp;
}
}
}
q = sq->sq_head;
while (q != NULL) {
ASSERT(q->q_syncq == sq);
if ((qp == NULL) || (qp == q)) {
mp_head = q->q_sqhead;
q->q_sqhead = q->q_sqtail = NULL;
ASSERT(mp_head->b_queue &&
mp_head->b_queue->q_syncq == sq);
for (bp = mp_head; bp != NULL; bp = mp_next) {
mp_next = bp->b_next;
bp->b_prev = bp->b_next = NULL;
freemsg(bp);
ret++;
}
ASSERT(q->q_sqflags & Q_SQQUEUED);
SQRM_Q(sq, q);
q->q_spri = 0;
q->q_syncqmsgs = 0;
if ((qp != NULL) && (sq->sq_flags & SQ_WANTWAKEUP)) {
sq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
mutex_exit(SQLOCK(sq));
clr_qfull(q);
if (qp != NULL) {
return (ret);
} else {
mutex_enter(SQLOCK(sq));
q = sq->sq_head;
}
} else {
q = q->q_sqnext;
}
ASSERT(MUTEX_HELD(SQLOCK(sq)));
}
if (sq->sq_flags & SQ_WANTWAKEUP) {
sq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
mutex_exit(SQLOCK(sq));
return (ret);
}
static int
propagate_syncq(queue_t *qp)
{
mblk_t *bp, *head, *tail, *prev, *next;
syncq_t *sq;
queue_t *nqp;
syncq_t *nsq;
boolean_t isdriver;
int moved = 0;
uint16_t flags;
pri_t priority = curthread->t_pri;
#ifdef DEBUG
void (*func)();
#endif
sq = qp->q_syncq;
ASSERT(MUTEX_HELD(SQLOCK(sq)));
SQ_PUTLOCKS_HELD(sq);
ASSERT((qp->q_flag & QPERQ) || (sq->sq_count >= 1));
if (qp->q_syncqmsgs) {
isdriver = (qp->q_flag & QISDRV);
if (!isdriver) {
nqp = qp->q_next;
nsq = nqp->q_syncq;
ASSERT(MUTEX_HELD(SQLOCK(nsq)));
SQ_PUTLOCKS_HELD(nsq);
#ifdef DEBUG
func = (void (*)())(uintptr_t)nqp->q_qinfo->qi_putp;
#endif
}
SQRM_Q(sq, qp);
priority = MAX(qp->q_spri, priority);
qp->q_spri = 0;
head = qp->q_sqhead;
tail = qp->q_sqtail;
qp->q_sqhead = qp->q_sqtail = NULL;
qp->q_syncqmsgs = 0;
for (bp = head; bp != NULL; bp = next) {
next = bp->b_next;
if (isdriver) {
bp->b_prev = bp->b_next = NULL;
freemsg(bp);
continue;
}
bp->b_queue = nqp;
#ifdef DEBUG
bp->b_prev = (mblk_t *)func;
#endif
moved++;
}
if (!isdriver && head != NULL) {
ASSERT(tail != NULL);
if (nqp->q_sqhead == NULL) {
nqp->q_sqhead = head;
} else {
ASSERT(nqp->q_sqtail != NULL);
nqp->q_sqtail->b_next = head;
}
nqp->q_sqtail = tail;
if (priority > nqp->q_spri)
nqp->q_spri = priority;
SQPUT_Q(nsq, nqp);
nqp->q_syncqmsgs += moved;
ASSERT(nqp->q_syncqmsgs != 0);
}
}
if (sq->sq_evhead != NULL) {
ASSERT(sq->sq_flags & SQ_EVENTS);
prev = NULL;
for (bp = sq->sq_evhead; bp != NULL; bp = next) {
next = bp->b_next;
if (bp->b_queue == qp) {
if (prev != NULL) {
prev->b_next = next;
if (bp == sq->sq_evtail) {
ASSERT(next == NULL);
sq->sq_evtail = prev;
}
} else
sq->sq_evhead = next;
if (sq->sq_evhead == NULL)
sq->sq_flags &= ~SQ_EVENTS;
bp->b_prev = bp->b_next = NULL;
freemsg(bp);
} else {
prev = bp;
}
}
}
flags = sq->sq_flags;
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
sq->sq_flags = flags;
return (moved);
}
void
qwriter_inner(queue_t *q, mblk_t *mp, void (*func)())
{
syncq_t *sq = q->q_syncq;
uint16_t count;
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
ASSERT(count >= 1);
ASSERT(sq->sq_type & (SQ_CIPUT|SQ_CISVC));
if (count == 1) {
sq->sq_flags |= SQ_EXCL;
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
(*func)(q, mp);
return;
}
SQ_PUTLOCKS_EXIT(sq);
sqfill_events(sq, q, mp, func);
}
callbparams_t *
callbparams_alloc(syncq_t *sq, void (*func)(void *), void *arg, int kmflags)
{
callbparams_t *cbp;
size_t size = sizeof (callbparams_t);
cbp = kmem_alloc(size, kmflags & ~KM_PANIC);
if (cbp == NULL) {
if (kmflags & KM_PANIC)
cbp = kmem_alloc_tryhard(sizeof (callbparams_t),
&size, kmflags);
else
return (NULL);
}
ASSERT(size >= sizeof (callbparams_t));
cbp->cbp_size = size;
cbp->cbp_sq = sq;
cbp->cbp_func = func;
cbp->cbp_arg = arg;
mutex_enter(SQLOCK(sq));
cbp->cbp_next = sq->sq_callbpend;
sq->sq_callbpend = cbp;
return (cbp);
}
void
callbparams_free(syncq_t *sq, callbparams_t *cbp)
{
callbparams_t **pp, *p;
ASSERT(MUTEX_HELD(SQLOCK(sq)));
for (pp = &sq->sq_callbpend; (p = *pp) != NULL; pp = &p->cbp_next) {
if (p == cbp) {
*pp = p->cbp_next;
kmem_free(p, p->cbp_size);
return;
}
}
(void) (STRLOG(0, 0, 0, SL_CONSOLE,
"callbparams_free: not found\n"));
}
void
callbparams_free_id(syncq_t *sq, callbparams_id_t id, int32_t flag)
{
callbparams_t **pp, *p;
ASSERT(MUTEX_HELD(SQLOCK(sq)));
for (pp = &sq->sq_callbpend; (p = *pp) != NULL; pp = &p->cbp_next) {
if (p->cbp_id == id && p->cbp_flags == flag) {
*pp = p->cbp_next;
kmem_free(p, p->cbp_size);
return;
}
}
(void) (STRLOG(0, 0, 0, SL_CONSOLE,
"callbparams_free_id: not found\n"));
}
void
qcallbwrapper(void *arg)
{
callbparams_t *cbp = arg;
syncq_t *sq;
uint16_t count = 0;
uint16_t waitflags = SQ_STAYAWAY | SQ_EVENTS | SQ_EXCL;
uint16_t type;
sq = cbp->cbp_sq;
mutex_enter(SQLOCK(sq));
type = sq->sq_type;
if (!(type & SQ_CICB)) {
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
SUM_SQ_PUTCOUNTS(sq, count);
sq->sq_needexcl++;
ASSERT(sq->sq_needexcl != 0);
waitflags |= SQ_MESSAGES;
}
ASSERT(type & SQ_COCB);
while ((sq->sq_flags & waitflags) || (!(type & SQ_CICB) &&count != 0)) {
if ((sq->sq_callbflags & cbp->cbp_flags) &&
(sq->sq_cancelid == cbp->cbp_id)) {
sq->sq_callbflags |= SQ_CALLB_BYPASSED;
callbparams_free(sq, cbp);
if (!(type & SQ_CICB)) {
ASSERT(sq->sq_needexcl > 0);
sq->sq_needexcl--;
if (sq->sq_needexcl == 0) {
SQ_PUTCOUNT_SETFAST_LOCKED(sq);
}
SQ_PUTLOCKS_EXIT(sq);
}
mutex_exit(SQLOCK(sq));
return;
}
sq->sq_flags |= SQ_WANTWAKEUP;
if (!(type & SQ_CICB)) {
SQ_PUTLOCKS_EXIT(sq);
}
cv_wait(&sq->sq_wait, SQLOCK(sq));
if (!(type & SQ_CICB)) {
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
}
sq->sq_count++;
ASSERT(sq->sq_count != 0);
if (!(type & SQ_CICB)) {
ASSERT(count == 0);
sq->sq_flags |= SQ_EXCL;
ASSERT(sq->sq_needexcl > 0);
sq->sq_needexcl--;
if (sq->sq_needexcl == 0) {
SQ_PUTCOUNT_SETFAST_LOCKED(sq);
}
SQ_PUTLOCKS_EXIT(sq);
}
mutex_exit(SQLOCK(sq));
cbp->cbp_func(cbp->cbp_arg);
mutex_enter(SQLOCK(sq));
callbparams_free(sq, cbp);
mutex_exit(SQLOCK(sq));
leavesq(sq, SQ_CALLBACK);
}
void
putnext_tail(syncq_t *sq, queue_t *qp, uint32_t passflags)
{
uint16_t flags = sq->sq_flags;
ASSERT(MUTEX_HELD(SQLOCK(sq)));
ASSERT(MUTEX_NOT_HELD(QLOCK(qp)));
if (passflags & SQ_EXCL) {
flags &= ~SQ_EXCL;
}
if (flags & SQ_WANTWAKEUP) {
flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
if (flags & SQ_WANTEXWAKEUP) {
flags &= ~SQ_WANTEXWAKEUP;
cv_broadcast(&sq->sq_exitwait);
}
sq->sq_flags = flags;
if (!(flags & (SQ_STAYAWAY|SQ_EXCL))) {
if ((passflags & SQ_QUEUED) ||
(sq->sq_svcflags & SQ_DISABLED)) {
drain_syncq(sq);
return;
} else if (flags & SQ_QUEUED) {
sqenable(sq);
}
}
mutex_exit(SQLOCK(sq));
TRACE_3(TR_FAC_STREAMS_FR, TR_PUTNEXT_END,
"putnext_end:(%p, %p, %p) done", NULL, qp, sq);
}
void
set_qend(queue_t *q)
{
mutex_enter(QLOCK(q));
if (!O_SAMESTR(q))
q->q_flag |= QEND;
else
q->q_flag &= ~QEND;
mutex_exit(QLOCK(q));
q = _OTHERQ(q);
mutex_enter(QLOCK(q));
if (!O_SAMESTR(q))
q->q_flag |= QEND;
else
q->q_flag &= ~QEND;
mutex_exit(QLOCK(q));
}
void
set_qfull(queue_t *q)
{
queue_t *fq = NULL;
ASSERT(MUTEX_HELD(QLOCK(q)));
if ((sq_max_size != 0) && (!(q->q_nfsrv->q_flag & QFULL)) &&
(q->q_syncqmsgs > sq_max_size)) {
if ((fq = q->q_nfsrv) == q) {
fq->q_flag |= QFULL;
} else {
mutex_exit(QLOCK(q));
mutex_enter(QLOCK(fq));
fq->q_flag |= QFULL;
mutex_exit(QLOCK(fq));
mutex_enter(QLOCK(q));
}
}
}
void
clr_qfull(queue_t *q)
{
queue_t *oq = q;
q = q->q_nfsrv;
if ((q->q_flag & (QFULL|QWANTW)) == 0) {
return;
}
mutex_enter(QLOCK(q));
if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
(q->q_mblkcnt < q->q_hiwat))) {
q->q_flag &= ~QFULL;
if ((q->q_flag & QWANTW) &&
(((q->q_count < q->q_lowat) &&
(q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
q->q_flag &= ~QWANTW;
mutex_exit(QLOCK(q));
backenable(oq, 0);
} else
mutex_exit(QLOCK(q));
} else
mutex_exit(QLOCK(q));
}
void
set_nfsrv_ptr(
queue_t *rnew,
queue_t *wnew,
queue_t *prev_rq,
queue_t *prev_wq)
{
queue_t *qp;
if (prev_wq->q_next == NULL) {
ASSERT(!(rnew->q_flag & _QINSERTING));
wnew->q_nfsrv = wnew;
if (rnew->q_qinfo->qi_srvp)
rnew->q_nfsrv = rnew;
else
rnew->q_nfsrv = prev_rq;
prev_rq->q_nfsrv = prev_rq;
prev_wq->q_nfsrv = prev_wq;
} else {
if (rnew->q_qinfo->qi_srvp) {
qp = _OTHERQ(prev_wq->q_next);
while (qp && qp->q_nfsrv != qp) {
qp->q_nfsrv = rnew;
qp = backq(qp);
}
rnew->q_nfsrv = rnew;
} else
rnew->q_nfsrv = prev_rq->q_nfsrv;
if (wnew->q_qinfo->qi_srvp) {
wnew->q_nfsrv = wnew;
if (rnew->q_flag & _QINSERTING) {
for (qp = prev_wq;
qp != NULL && qp->q_nfsrv != qp;
qp = backq(qp)) {
qp->q_nfsrv = wnew->q_nfsrv;
}
}
} else {
if (prev_wq->q_next == prev_rq)
wnew->q_nfsrv = rnew->q_nfsrv;
else
wnew->q_nfsrv = prev_wq->q_next->q_nfsrv;
}
}
}
void
reset_nfsrv_ptr(queue_t *rqp, queue_t *wqp)
{
queue_t *tmp_qp;
if ((rqp->q_flag & _QREMOVING) && (wqp->q_qinfo->qi_srvp != NULL)) {
for (tmp_qp = backq(wqp);
tmp_qp != NULL && tmp_qp->q_nfsrv == wqp;
tmp_qp = backq(tmp_qp)) {
tmp_qp->q_nfsrv = wqp->q_nfsrv;
}
}
if (rqp->q_qinfo->qi_srvp) {
if (wqp->q_next) {
tmp_qp = _OTHERQ(wqp->q_next);
while (tmp_qp && tmp_qp->q_nfsrv == rqp) {
ASSERT(rqp->q_next != NULL);
tmp_qp->q_nfsrv = rqp->q_next->q_nfsrv;
tmp_qp = backq(tmp_qp);
}
}
}
}
static void
strsetuio(stdata_t *stp)
{
queue_t *wrq;
if (stp->sd_flag & STPLEX) {
stp->sd_struiowrq = NULL;
stp->sd_struiordq = NULL;
return;
}
wrq = stp->sd_wrq->q_next;
while (wrq) {
if (wrq->q_struiot == STRUIOT_NONE) {
wrq = 0;
break;
}
if (wrq->q_struiot != STRUIOT_DONTCARE)
break;
if (! _SAMESTR(wrq)) {
wrq = 0;
break;
}
wrq = wrq->q_next;
}
stp->sd_struiowrq = wrq;
wrq = stp->sd_wrq->q_next;
while (wrq) {
if (_RD(wrq)->q_struiot == STRUIOT_NONE) {
wrq = 0;
break;
}
if (_RD(wrq)->q_struiot != STRUIOT_DONTCARE)
break;
if (! _SAMESTR(wrq)) {
wrq = 0;
break;
}
wrq = wrq->q_next;
}
stp->sd_struiordq = wrq ? _RD(wrq) : 0;
}
static int
pass_rput(queue_t *q, mblk_t *mp)
{
putnext(q, mp);
return (0);
}
static int
pass_wput(queue_t *q, mblk_t *mp)
{
syncq_t *sq;
sq = _RD(q)->q_syncq;
if (sq->sq_flags & SQ_BLOCKED)
unblocksq(sq, SQ_BLOCKED, 0);
putnext(q, mp);
return (0);
}
static queue_t *
link_addpassthru(stdata_t *stpdown)
{
queue_t *passq;
sqlist_t sqlist;
passq = allocq();
STREAM(passq) = STREAM(_WR(passq)) = stpdown;
setq(passq, &passthru_rinit, &passthru_winit, NULL, QPERQ,
SQ_CI|SQ_CO, B_FALSE);
claimq(passq);
blocksq(passq->q_syncq, SQ_BLOCKED, 1);
insertq(STREAM(passq), passq);
sqlist.sqlist_head = NULL;
sqlist.sqlist_index = 0;
sqlist.sqlist_size = sizeof (sqlist_t);
sqlist_insert(&sqlist, _RD(stpdown->sd_wrq)->q_syncq);
strlock(stpdown, &sqlist);
strunlock(stpdown, &sqlist);
releaseq(passq);
return (passq);
}
static void
link_rempassthru(queue_t *passq)
{
claimq(passq);
removeq(passq);
releaseq(passq);
freeq(passq);
}
clock_t
str_cv_wait(kcondvar_t *cvp, kmutex_t *mp, clock_t tim, int nosigs)
{
clock_t ret;
if (tim < 0) {
if (nosigs) {
cv_wait(cvp, mp);
ret = 1;
} else {
ret = cv_wait_sig(cvp, mp);
}
} else if (tim > 0) {
if (nosigs) {
ret = cv_reltimedwait(cvp, mp,
MSEC_TO_TICK_ROUNDUP(tim), TR_CLOCK_TICK);
} else {
ret = cv_reltimedwait_sig(cvp, mp,
MSEC_TO_TICK_ROUNDUP(tim), TR_CLOCK_TICK);
}
} else {
ret = -1;
}
return (ret);
}
int
strwaitmark(vnode_t *vp)
{
struct stdata *stp = vp->v_stream;
queue_t *rq = _RD(stp->sd_wrq);
int mark;
mutex_enter(&stp->sd_lock);
while (rq->q_first == NULL &&
!(stp->sd_flag & (STRATMARK|STRNOTATMARK|STREOF))) {
stp->sd_flag |= RSLEEP;
if (str_cv_wait(&rq->q_wait, &stp->sd_lock, 100, 1) == -1) {
mutex_exit(&stp->sd_lock);
return (-1);
}
}
if (stp->sd_flag & STRATMARK)
mark = 1;
else if (rq->q_first != NULL && (rq->q_first->b_flag & MSGMARK))
mark = 1;
else
mark = 0;
mutex_exit(&stp->sd_lock);
return (mark);
}
void
strsetrerror(vnode_t *vp, int error, int persist, errfunc_t errfunc)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
stp->sd_rerror = error;
if (error == 0 && errfunc == NULL)
stp->sd_flag &= ~STRDERR;
else
stp->sd_flag |= STRDERR;
if (persist) {
stp->sd_flag &= ~STRDERRNONPERSIST;
} else {
stp->sd_flag |= STRDERRNONPERSIST;
}
stp->sd_rderrfunc = errfunc;
if (error != 0 || errfunc != NULL) {
cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
cv_broadcast(&stp->sd_wrq->q_wait);
cv_broadcast(&stp->sd_monitor);
mutex_exit(&stp->sd_lock);
pollwakeup(&stp->sd_pollist, POLLERR);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & S_ERROR)
strsendsig(stp->sd_siglist, S_ERROR, 0, error);
}
mutex_exit(&stp->sd_lock);
}
void
strsetwerror(vnode_t *vp, int error, int persist, errfunc_t errfunc)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
stp->sd_werror = error;
if (error == 0 && errfunc == NULL)
stp->sd_flag &= ~STWRERR;
else
stp->sd_flag |= STWRERR;
if (persist) {
stp->sd_flag &= ~STWRERRNONPERSIST;
} else {
stp->sd_flag |= STWRERRNONPERSIST;
}
stp->sd_wrerrfunc = errfunc;
if (error != 0 || errfunc != NULL) {
cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
cv_broadcast(&stp->sd_wrq->q_wait);
cv_broadcast(&stp->sd_monitor);
mutex_exit(&stp->sd_lock);
pollwakeup(&stp->sd_pollist, POLLERR);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & S_ERROR)
strsendsig(stp->sd_siglist, S_ERROR, 0, error);
}
mutex_exit(&stp->sd_lock);
}
void
strseteof(vnode_t *vp, int eof)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
if (!eof) {
stp->sd_flag &= ~STREOF;
mutex_exit(&stp->sd_lock);
return;
}
stp->sd_flag |= STREOF;
if (stp->sd_flag & RSLEEP) {
stp->sd_flag &= ~RSLEEP;
cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
}
mutex_exit(&stp->sd_lock);
pollwakeup(&stp->sd_pollist, POLLIN|POLLRDNORM);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & (S_INPUT|S_RDNORM))
strsendsig(stp->sd_siglist, S_INPUT|S_RDNORM, 0, 0);
mutex_exit(&stp->sd_lock);
}
void
strflushrq(vnode_t *vp, int flag)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
flushq(_RD(stp->sd_wrq), flag);
mutex_exit(&stp->sd_lock);
}
void
strsetrputhooks(vnode_t *vp, uint_t flags,
msgfunc_t protofunc, msgfunc_t miscfunc)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
if (protofunc == NULL)
stp->sd_rprotofunc = strrput_proto;
else
stp->sd_rprotofunc = protofunc;
if (miscfunc == NULL)
stp->sd_rmiscfunc = strrput_misc;
else
stp->sd_rmiscfunc = miscfunc;
if (flags & SH_CONSOL_DATA)
stp->sd_rput_opt |= SR_CONSOL_DATA;
else
stp->sd_rput_opt &= ~SR_CONSOL_DATA;
if (flags & SH_SIGALLDATA)
stp->sd_rput_opt |= SR_SIGALLDATA;
else
stp->sd_rput_opt &= ~SR_SIGALLDATA;
if (flags & SH_IGN_ZEROLEN)
stp->sd_rput_opt |= SR_IGN_ZEROLEN;
else
stp->sd_rput_opt &= ~SR_IGN_ZEROLEN;
mutex_exit(&stp->sd_lock);
}
void
strsetwputhooks(vnode_t *vp, uint_t flags, clock_t closetime)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
stp->sd_closetime = closetime;
if (flags & SH_SIGPIPE)
stp->sd_wput_opt |= SW_SIGPIPE;
else
stp->sd_wput_opt &= ~SW_SIGPIPE;
if (flags & SH_RECHECK_ERR)
stp->sd_wput_opt |= SW_RECHECK_ERR;
else
stp->sd_wput_opt &= ~SW_RECHECK_ERR;
mutex_exit(&stp->sd_lock);
}
void
strsetrwputdatahooks(vnode_t *vp, msgfunc_t rdatafunc, msgfunc_t wdatafunc)
{
struct stdata *stp = vp->v_stream;
mutex_enter(&stp->sd_lock);
stp->sd_rputdatafunc = rdatafunc;
stp->sd_wputdatafunc = wdatafunc;
mutex_exit(&stp->sd_lock);
}
void
qenable_locked(queue_t *q)
{
stdata_t *stp = STREAM(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (!q->q_qinfo->qi_srvp)
return;
if (q->q_flag & (QWCLOSE|QENAB))
return;
q->q_flag |= QENAB;
if (q->q_flag & QINSERVICE)
return;
q->q_qtstamp = ddi_get_lbolt();
mutex_enter(&stp->sd_qlock);
IMPLY(STREAM_NEEDSERVICE(stp),
(stp->sd_svcflags & (STRS_WILLSERVICE | STRS_SCHEDULED)));
ENQUEUE(q, stp->sd_qhead, stp->sd_qtail, q_link);
stp->sd_nqueues++;
if (!(stp->sd_svcflags & (STRS_WILLSERVICE | STRS_SCHEDULED))) {
STRSTAT(stenables);
stp->sd_svcflags |= STRS_SCHEDULED;
stp->sd_servid = (void *)taskq_dispatch(streams_taskq,
(task_func_t *)stream_service, stp, TQ_NOSLEEP|TQ_NOQUEUE);
if (stp->sd_servid == NULL) {
STRSTAT(taskqfails);
stp->sd_svcflags &= ~STRS_SCHEDULED;
mutex_enter(&service_queue);
ASSERT((stp->sd_qhead == q) && (stp->sd_qtail == q));
ASSERT(q->q_link == NULL);
if (qhead == NULL)
qhead = q;
else
qtail->q_link = q;
qtail = q;
stp->sd_qhead = stp->sd_qtail = NULL;
stp->sd_nqueues = 0;
cv_signal(&services_to_run);
mutex_exit(&service_queue);
}
}
mutex_exit(&stp->sd_qlock);
}
static void
queue_service(queue_t *q)
{
ASSERT(!(q->q_flag & QINSERVICE));
ASSERT((q->q_flag & QENAB));
mutex_enter(QLOCK(q));
q->q_flag &= ~QENAB;
q->q_flag |= QINSERVICE;
mutex_exit(QLOCK(q));
runservice(q);
}
static void
syncq_service(syncq_t *sq)
{
STRSTAT(syncqservice);
mutex_enter(SQLOCK(sq));
ASSERT(!(sq->sq_svcflags & SQ_SERVICE));
ASSERT(sq->sq_servcount != 0);
ASSERT(sq->sq_next == NULL);
if (sq->sq_svcflags & SQ_BGTHREAD)
sq->sq_svcflags &= ~SQ_BGTHREAD;
sq->sq_svcflags |= SQ_SERVICE;
drain_syncq(sq);
}
static void
qwriter_outer_service(syncq_t *outer)
{
outer_enter(outer, SQ_BLOCKED|SQ_WRITER);
outer_exit(outer);
}
static void
mblk_free(mblk_t *mp)
{
dblk_t *dbp = mp->b_datap;
frtn_t *frp = dbp->db_frtnp;
mp->b_next = NULL;
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
ASSERT(dbp->db_fthdr == NULL);
frp->free_func(frp->free_arg);
ASSERT(dbp->db_mblk == mp);
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
kmem_cache_free(dbp->db_cache, dbp);
}
static void
stream_service(stdata_t *stp)
{
queue_t *q;
mutex_enter(&stp->sd_qlock);
STR_SERVICE(stp, q);
stp->sd_svcflags &= ~STRS_SCHEDULED;
stp->sd_servid = NULL;
cv_signal(&stp->sd_qcv);
mutex_exit(&stp->sd_qlock);
}
void
stream_runservice(stdata_t *stp)
{
queue_t *q;
mutex_enter(&stp->sd_qlock);
STRSTAT(rservice);
stp->sd_svcflags |= STRS_WILLSERVICE;
STR_SERVICE(stp, q);
stp->sd_svcflags &= ~STRS_WILLSERVICE;
mutex_exit(&stp->sd_qlock);
}
void
stream_willservice(stdata_t *stp)
{
mutex_enter(&stp->sd_qlock);
stp->sd_svcflags |= STRS_WILLSERVICE;
mutex_exit(&stp->sd_qlock);
}
void
mblk_setcred(mblk_t *mp, cred_t *cr, pid_t cpid)
{
dblk_t *dbp = mp->b_datap;
cred_t *ocr = dbp->db_credp;
ASSERT(cr != NULL);
if (cr != ocr) {
crhold(dbp->db_credp = cr);
if (ocr != NULL)
crfree(ocr);
}
if (cpid != NOPID)
dbp->db_cpid = cpid;
}
void
mblk_copycred(mblk_t *mp, const mblk_t *src)
{
dblk_t *dbp = mp->b_datap;
cred_t *cr, *ocr;
pid_t cpid;
cr = msg_getcred(src, &cpid);
if (cr == NULL)
return;
ocr = dbp->db_credp;
if (cr != ocr) {
crhold(dbp->db_credp = cr);
if (ocr != NULL)
crfree(ocr);
}
if (cpid != NOPID)
dbp->db_cpid = cpid;
}
void
lso_info_set(mblk_t *mp, uint32_t mss, uint32_t flags)
{
ASSERT(DB_TYPE(mp) == M_DATA);
ASSERT((flags & ~HW_LSO_FLAGS) == 0);
DB_LSOFLAGS(mp) |= flags;
DB_LSOMSS(mp) = mss;
}
void
lso_info_cleanup(mblk_t *mp)
{
ASSERT(DB_TYPE(mp) == M_DATA);
DB_LSOFLAGS(mp) &= ~HW_LSO_FLAGS;
DB_LSOMSS(mp) = 0;
}
unsigned
bcksum(uchar_t *bp, int len, unsigned int psum)
{
int odd = len & 1;
extern unsigned int ip_ocsum();
if (((intptr_t)bp & 1) == 0 && !odd) {
return (ip_ocsum((ushort_t *)bp, len >> 1, psum));
}
if (((intptr_t)bp & 1) != 0) {
unsigned int tsum;
#ifdef _LITTLE_ENDIAN
psum += *bp;
#else
psum += *bp << 8;
#endif
len--;
bp++;
tsum = ip_ocsum((ushort_t *)bp, len >> 1, 0);
psum += (tsum << 8) & 0xffff | (tsum >> 8);
if (len & 1) {
bp += len - 1;
#ifdef _LITTLE_ENDIAN
psum += *bp << 8;
#else
psum += *bp;
#endif
}
} else {
psum = ip_ocsum((ushort_t *)bp, len >> 1, psum);
if (odd) {
bp += len - 1;
#ifdef _LITTLE_ENDIAN
psum += *bp;
#else
psum += *bp << 8;
#endif
}
}
return ((psum >> 16) + (psum & 0xFFFF));
}
void
freemsgchain(mblk_t *mp)
{
mblk_t *next;
while (mp != NULL) {
next = mp->b_next;
mp->b_next = NULL;
freemsg(mp);
mp = next;
}
}
mblk_t *
copymsgchain(mblk_t *mp)
{
mblk_t *nmp = NULL;
mblk_t **nmpp = &nmp;
for (; mp != NULL; mp = mp->b_next) {
if ((*nmpp = copymsg(mp)) == NULL) {
freemsgchain(nmp);
return (NULL);
}
nmpp = &((*nmpp)->b_next);
}
return (nmp);
}
#undef QLOCK
kmutex_t *
QLOCK(queue_t *q)
{
return (&(q)->q_lock);
}
#undef runqueues
void
runqueues(void)
{
}
#undef queuerun
void
queuerun(void)
{
}
static void *
str_stack_init(netstackid_t stackid, netstack_t *ns)
{
str_stack_t *ss;
int i;
ss = (str_stack_t *)kmem_zalloc(sizeof (*ss), KM_SLEEP);
ss->ss_netstack = ns;
sad_initspace(ss);
ss->ss_devcnt = devcnt;
ss->ss_mux_nodes = kmem_zalloc((sizeof (struct mux_node) *
ss->ss_devcnt), KM_SLEEP);
for (i = 0; i < ss->ss_devcnt; i++)
ss->ss_mux_nodes[i].mn_imaj = i;
return (ss);
}
static void
str_stack_shutdown(netstackid_t stackid, void *arg)
{
str_stack_t *ss = (str_stack_t *)arg;
int i;
cred_t *cr;
cr = zone_get_kcred(netstackid_to_zoneid(stackid));
ASSERT(cr != NULL);
for (i = 0; i < ss->ss_devcnt; i++) {
struct mux_edge *ep;
ldi_handle_t lh;
ldi_ident_t li;
int ret;
int rval;
dev_t rdev;
ep = ss->ss_mux_nodes[i].mn_outp;
if (ep == NULL)
continue;
ret = ldi_ident_from_major((major_t)i, &li);
if (ret != 0) {
continue;
}
rdev = ep->me_dev;
ret = ldi_open_by_dev(&rdev, OTYP_CHR, FREAD|FWRITE,
cr, &lh, li);
if (ret != 0) {
ldi_ident_release(li);
continue;
}
ret = ldi_ioctl(lh, I_PUNLINK, (intptr_t)MUXID_ALL, FKIOCTL,
cr, &rval);
if (ret) {
(void) ldi_close(lh, FREAD|FWRITE, cr);
ldi_ident_release(li);
continue;
}
(void) ldi_close(lh, FREAD|FWRITE, cr);
ldi_ident_release(li);
}
crfree(cr);
sad_freespace(ss);
kmem_free(ss->ss_mux_nodes, sizeof (struct mux_node) * ss->ss_devcnt);
ss->ss_mux_nodes = NULL;
}
static void
str_stack_fini(netstackid_t stackid, void *arg)
{
str_stack_t *ss = (str_stack_t *)arg;
kmem_free(ss, sizeof (*ss));
}