#include <sys/types.h>
#include <sys/param.h>
#include <sys/systm.h>
#include <sys/counter.h>
#include <sys/lock.h>
#include <sys/mutex.h>
#include <sys/malloc.h>
#include <machine/cpu.h>
#include <net/mp_ring.h>
union ring_state {
struct {
uint16_t pidx_head;
uint16_t pidx_tail;
uint16_t cidx;
uint16_t flags;
};
uint64_t state;
};
enum {
IDLE = 0,
BUSY,
STALLED,
ABDICATED,
};
static inline uint16_t
space_available(struct ifmp_ring *r, union ring_state s)
{
uint16_t x = r->size - 1;
if (s.cidx == s.pidx_head)
return (x);
else if (s.cidx > s.pidx_head)
return (s.cidx - s.pidx_head - 1);
else
return (x - s.pidx_head + s.cidx);
}
static inline uint16_t
increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
{
int x = r->size - idx;
MPASS(x > 0);
return (x > n ? idx + n : n - x);
}
static inline uint16_t
state_to_flags(union ring_state s, int abdicate)
{
if (s.cidx == s.pidx_tail)
return (IDLE);
else if (abdicate && s.pidx_tail != s.pidx_head)
return (ABDICATED);
return (BUSY);
}
#ifdef MP_RING_NO_64BIT_ATOMICS
static void
drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
{
union ring_state ns;
int n, pending, total;
uint16_t cidx = os.cidx;
uint16_t pidx = os.pidx_tail;
MPASS(os.flags == BUSY);
MPASS(cidx != pidx);
if (prev == IDLE)
counter_u64_add(r->starts, 1);
pending = 0;
total = 0;
while (cidx != pidx) {
n = r->drain(r, cidx, pidx);
if (n == 0) {
os.state = ns.state = r->state;
ns.cidx = cidx;
ns.flags = STALLED;
r->state = ns.state;
if (prev != STALLED)
counter_u64_add(r->stalls, 1);
else if (total > 0) {
counter_u64_add(r->restarts, 1);
counter_u64_add(r->stalls, 1);
}
break;
}
cidx = increment_idx(r, cidx, n);
pending += n;
total += n;
if (cidx != pidx && pending < 64 && total < budget)
continue;
os.state = ns.state = r->state;
ns.cidx = cidx;
ns.flags = state_to_flags(ns, total >= budget);
r->state = ns.state;
if (ns.flags == ABDICATED)
counter_u64_add(r->abdications, 1);
if (ns.flags != BUSY) {
MPASS(ns.flags != STALLED);
if (prev == STALLED) {
MPASS(total > 0);
counter_u64_add(r->restarts, 1);
}
break;
}
pidx = ns.pidx_tail;
pending = 0;
}
}
#else
static void
drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
{
union ring_state ns;
int n, pending, total;
uint16_t cidx = os.cidx;
uint16_t pidx = os.pidx_tail;
MPASS(os.flags == BUSY);
MPASS(cidx != pidx);
if (prev == IDLE)
counter_u64_add(r->starts, 1);
pending = 0;
total = 0;
while (cidx != pidx) {
n = r->drain(r, cidx, pidx);
if (n == 0) {
critical_enter();
os.state = r->state;
do {
ns.state = os.state;
ns.cidx = cidx;
ns.flags = STALLED;
} while (atomic_fcmpset_64(&r->state, &os.state,
ns.state) == 0);
critical_exit();
if (prev != STALLED)
counter_u64_add(r->stalls, 1);
else if (total > 0) {
counter_u64_add(r->restarts, 1);
counter_u64_add(r->stalls, 1);
}
break;
}
cidx = increment_idx(r, cidx, n);
pending += n;
total += n;
if (cidx != pidx && pending < 64 && total < budget)
continue;
critical_enter();
os.state = r->state;
do {
ns.state = os.state;
ns.cidx = cidx;
ns.flags = state_to_flags(ns, total >= budget);
} while (atomic_fcmpset_acq_64(&r->state, &os.state,
ns.state) == 0);
critical_exit();
if (ns.flags == ABDICATED)
counter_u64_add(r->abdications, 1);
if (ns.flags != BUSY) {
MPASS(ns.flags != STALLED);
if (prev == STALLED) {
MPASS(total > 0);
counter_u64_add(r->restarts, 1);
}
break;
}
pidx = ns.pidx_tail;
pending = 0;
}
}
#endif
int
ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
{
struct ifmp_ring *r;
if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
can_drain == NULL)
return (EINVAL);
*pr = NULL;
flags &= M_NOWAIT | M_WAITOK;
MPASS(flags != 0);
r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
if (r == NULL)
return (ENOMEM);
r->size = size;
r->cookie = cookie;
r->mt = mt;
r->drain = drain;
r->can_drain = can_drain;
r->enqueues = counter_u64_alloc(flags);
r->drops = counter_u64_alloc(flags);
r->starts = counter_u64_alloc(flags);
r->stalls = counter_u64_alloc(flags);
r->restarts = counter_u64_alloc(flags);
r->abdications = counter_u64_alloc(flags);
if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
r->stalls == NULL || r->restarts == NULL ||
r->abdications == NULL) {
ifmp_ring_free(r);
return (ENOMEM);
}
*pr = r;
#ifdef MP_RING_NO_64BIT_ATOMICS
mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
#endif
return (0);
}
void
ifmp_ring_free(struct ifmp_ring *r)
{
if (r == NULL)
return;
if (r->enqueues != NULL)
counter_u64_free(r->enqueues);
if (r->drops != NULL)
counter_u64_free(r->drops);
if (r->starts != NULL)
counter_u64_free(r->starts);
if (r->stalls != NULL)
counter_u64_free(r->stalls);
if (r->restarts != NULL)
counter_u64_free(r->restarts);
if (r->abdications != NULL)
counter_u64_free(r->abdications);
free(r, r->mt);
}
#ifdef MP_RING_NO_64BIT_ATOMICS
int
ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
{
union ring_state os, ns;
uint16_t pidx_start, pidx_stop;
int i;
MPASS(items != NULL);
MPASS(n > 0);
mtx_lock(&r->lock);
os.state = r->state;
if (n >= space_available(r, os)) {
counter_u64_add(r->drops, n);
MPASS(os.flags != IDLE);
mtx_unlock(&r->lock);
if (os.flags == STALLED)
ifmp_ring_check_drainage(r, 0);
return (ENOBUFS);
}
ns.state = os.state;
ns.pidx_head = increment_idx(r, os.pidx_head, n);
r->state = ns.state;
pidx_start = os.pidx_head;
pidx_stop = ns.pidx_head;
while (ns.pidx_tail != pidx_start) {
cpu_spinwait();
ns.state = r->state;
}
i = pidx_start;
do {
r->items[i] = *items++;
if (__predict_false(++i == r->size))
i = 0;
} while (i != pidx_stop);
os.state = ns.state = r->state;
ns.pidx_tail = pidx_stop;
if (abdicate) {
if (os.flags == IDLE)
ns.flags = ABDICATED;
} else
ns.flags = BUSY;
r->state = ns.state;
counter_u64_add(r->enqueues, n);
if (!abdicate) {
if (os.flags != BUSY)
drain_ring_locked(r, ns, os.flags, budget);
}
mtx_unlock(&r->lock);
return (0);
}
#else
int
ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
{
union ring_state os, ns;
uint16_t pidx_start, pidx_stop;
int i;
MPASS(items != NULL);
MPASS(n > 0);
os.state = r->state;
for (;;) {
if (n >= space_available(r, os)) {
counter_u64_add(r->drops, n);
MPASS(os.flags != IDLE);
if (os.flags == STALLED)
ifmp_ring_check_drainage(r, 0);
return (ENOBUFS);
}
ns.state = os.state;
ns.pidx_head = increment_idx(r, os.pidx_head, n);
critical_enter();
if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
break;
critical_exit();
cpu_spinwait();
}
pidx_start = os.pidx_head;
pidx_stop = ns.pidx_head;
while (ns.pidx_tail != pidx_start) {
cpu_spinwait();
ns.state = r->state;
}
i = pidx_start;
do {
r->items[i] = *items++;
if (__predict_false(++i == r->size))
i = 0;
} while (i != pidx_stop);
os.state = r->state;
do {
ns.state = os.state;
ns.pidx_tail = pidx_stop;
if (abdicate) {
if (os.flags == IDLE)
ns.flags = ABDICATED;
} else
ns.flags = BUSY;
} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
critical_exit();
counter_u64_add(r->enqueues, n);
if (!abdicate) {
if (os.flags != BUSY)
drain_ring_lockless(r, ns, os.flags, budget);
}
return (0);
}
#endif
void
ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
{
union ring_state os, ns;
os.state = r->state;
if ((os.flags != STALLED && os.flags != ABDICATED) ||
os.pidx_head != os.pidx_tail ||
(os.flags != ABDICATED && r->can_drain(r) == 0))
return;
MPASS(os.cidx != os.pidx_tail);
ns.state = os.state;
ns.flags = BUSY;
#ifdef MP_RING_NO_64BIT_ATOMICS
mtx_lock(&r->lock);
if (r->state != os.state) {
mtx_unlock(&r->lock);
return;
}
r->state = ns.state;
drain_ring_locked(r, ns, os.flags, budget);
mtx_unlock(&r->lock);
#else
if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
return;
drain_ring_lockless(r, ns, os.flags, budget);
#endif
}
void
ifmp_ring_reset_stats(struct ifmp_ring *r)
{
counter_u64_zero(r->enqueues);
counter_u64_zero(r->drops);
counter_u64_zero(r->starts);
counter_u64_zero(r->stalls);
counter_u64_zero(r->restarts);
counter_u64_zero(r->abdications);
}
int
ifmp_ring_is_idle(struct ifmp_ring *r)
{
union ring_state s;
s.state = r->state;
if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
s.flags == IDLE)
return (1);
return (0);
}
int
ifmp_ring_is_stalled(struct ifmp_ring *r)
{
union ring_state s;
s.state = r->state;
if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
return (1);
return (0);
}