#define NED(_fmt, ...) do {} while (0)
#define ED(_fmt, ...) \
do { \
struct timeval _t0; \
gettimeofday(&_t0, NULL); \
fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
(int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
__FUNCTION__, __LINE__, ##__VA_ARGS__); \
} while (0)
#define WWW(_fmt, ...) ED("--WWW-- " _fmt, ##__VA_ARGS__)
#define EEE(_fmt, ...) ED("--EEE-- " _fmt, ##__VA_ARGS__)
#define DDD(_fmt, ...) ED("--DDD-- " _fmt, ##__VA_ARGS__)
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <libnetmap.h>
#include <math.h>
#include <pthread.h>
#ifdef __FreeBSD__
#include <pthread_np.h>
#include <sys/cpuset.h>
#endif
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/poll.h>
#include <sys/resource.h>
#include <sys/time.h>
#include <unistd.h>
struct q_pkt {
uint64_t next;
uint64_t pktlen;
uint64_t pt_qout;
uint64_t pt_tx;
};
struct pcap_file_header {
uint32_t magic;
uint16_t version_major;
uint16_t version_minor;
int32_t thiszone;
uint32_t stampacc;
uint32_t snaplen;
uint32_t network;
};
#if 0
struct pcap_file_header {
bpf_u_int32 magic;
u_short version_major;
u_short version_minor;
bpf_int32 thiszone;
bpf_u_int32 sigfigs;
bpf_u_int32 snaplen;
bpf_u_int32 linktype;
};
struct pcap_pkthdr {
struct timeval ts;
bpf_u_int32 caplen;
bpf_u_int32 len;
};
#endif
struct pcap_pkthdr {
uint32_t ts_sec;
uint32_t ts_frac;
uint32_t caplen;
uint32_t len;
};
#define PKT_PAD (32)
static inline int pad(int x)
{
return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
}
struct nm_pcap_file {
int fd;
uint64_t filesize;
const char *data;
uint64_t tot_pkt;
uint64_t tot_bytes;
uint64_t tot_bytes_rounded;
uint32_t resolution;
int swap;
uint64_t first_ts;
uint64_t total_tx_time;
uint64_t file_len;
const char *cur;
const char *lim;
int err;
};
static struct nm_pcap_file *readpcap(const char *fn);
static void destroy_pcap(struct nm_pcap_file *file);
#define NS_SCALE 1000000000UL
static void destroy_pcap(struct nm_pcap_file *pf)
{
if (!pf)
return;
munmap((void *)(uintptr_t)pf->data, pf->filesize);
close(pf->fd);
bzero(pf, sizeof(*pf));
free(pf);
return;
}
static uint32_t
cvt(const void *src, int size, char swap)
{
uint32_t ret = 0;
if (size != 2 && size != 4) {
EEE("Invalid size %d\n", size);
exit(1);
}
memcpy(&ret, src, size);
if (swap) {
unsigned char tmp, *data = (unsigned char *)&ret;
int i;
for (i = 0; i < size / 2; i++) {
tmp = data[i];
data[i] = data[size - (1 + i)];
data[size - (1 + i)] = tmp;
}
}
return ret;
}
static uint32_t
read_next_info(struct nm_pcap_file *pf, int size)
{
const char *end = pf->cur + size;
uint32_t ret;
if (end > pf->lim) {
pf->err = 1;
ret = 0;
} else {
ret = cvt(pf->cur, size, pf->swap);
pf->cur = end;
}
return ret;
}
static struct nm_pcap_file *
readpcap(const char *fn)
{
struct nm_pcap_file _f, *pf = &_f;
uint64_t prev_ts, first_pkt_time;
uint32_t magic, first_len = 0;
bzero(pf, sizeof(*pf));
pf->fd = open(fn, O_RDONLY);
if (pf->fd < 0) {
EEE("cannot open file %s", fn);
return NULL;
}
pf->filesize = lseek(pf->fd, 0, SEEK_END);
lseek(pf->fd, 0, SEEK_SET);
ED("filesize is %lu", (u_long)(pf->filesize));
if (pf->filesize < sizeof(struct pcap_file_header)) {
EEE("file too short %s", fn);
close(pf->fd);
return NULL;
}
pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
if (pf->data == MAP_FAILED) {
EEE("cannot mmap file %s", fn);
close(pf->fd);
return NULL;
}
pf->cur = pf->data;
pf->lim = pf->data + pf->filesize;
pf->err = 0;
pf->swap = 0;
magic = read_next_info(pf, 4);
ED("magic is 0x%x", magic);
switch (magic) {
case 0xa1b2c3d4:
pf->swap = 0;
pf->resolution = 1000;
break;
case 0xd4c3b2a1:
pf->swap = 1;
pf->resolution = 1000;
break;
case 0xa1b23c4d:
pf->swap = 0;
pf->resolution = 1;
break;
case 0x4d3cb2a1:
pf->swap = 1;
pf->resolution = 1;
break;
default:
EEE("unknown magic 0x%x", magic);
return NULL;
}
ED("swap %d res %d\n", pf->swap, pf->resolution);
pf->cur = pf->data + sizeof(struct pcap_file_header);
pf->lim = pf->data + pf->filesize;
pf->err = 0;
prev_ts = 0;
while (pf->cur < pf->lim && pf->err == 0) {
uint32_t base = pf->cur - pf->data;
uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
read_next_info(pf, 4) * pf->resolution;
uint32_t caplen = read_next_info(pf, 4);
uint32_t len = read_next_info(pf, 4);
if (pf->err) {
WWW("end of pcap file after %d packets\n",
(int)pf->tot_pkt);
break;
}
if (cur_ts < prev_ts) {
WWW("reordered packet %d\n",
(int)pf->tot_pkt);
}
prev_ts = cur_ts;
(void)base;
if (pf->tot_pkt == 0) {
pf->first_ts = cur_ts;
first_len = len;
}
pf->tot_pkt++;
pf->tot_bytes += len;
pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
pf->cur += caplen;
}
pf->total_tx_time = prev_ts - pf->first_ts;
ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
(u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
1e-9*pf->total_tx_time, (u_long)first_len);
if (pf->tot_bytes == first_len) {
first_pkt_time = first_len * 8;
} else {
first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
}
ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
pf->total_tx_time += first_pkt_time;
pf->first_ts -= first_pkt_time;
pf = calloc(1, sizeof(*pf));
*pf = _f;
pf->cur = pf->data + sizeof(struct pcap_file_header);
pf->err = 0;
return pf;
}
enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
static int verbose = 0;
static int do_abort = 0;
#ifdef linux
#define cpuset_t cpu_set_t
#endif
#ifdef __APPLE__
#define cpuset_t uint64_t
static inline void CPU_ZERO(cpuset_t *p)
{
*p = 0;
}
static inline void CPU_SET(uint32_t i, cpuset_t *p)
{
*p |= 1<< (i & 0x3f);
}
#define pthread_setaffinity_np(a, b, c) ((void)a, 0)
#define sched_setscheduler(a, b, c) (1)
#define clock_gettime(a,b) \
do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
#define _P64 unsigned long
#endif
#ifndef _P64
#define _P64 uint64_t
#endif
struct _qs;
struct _cfg {
int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);
int (*run)(struct _qs *, struct _cfg *arg);
const char *optarg;
void *arg;
int arg_len;
uint64_t d[16];
double f[4];
};
#define INFINITE_BW (200ULL*1000000*1000)
#define MY_CACHELINE (128ULL)
#define MAX_PKT (9200)
#define ALIGN_CACHE __attribute__ ((aligned (MY_CACHELINE)))
struct _qs {
uint64_t t0;
uint64_t buflen;
char *buf;
struct _cfg c_delay;
struct _cfg c_bw;
struct _cfg c_loss;
uint64_t tx ALIGN_CACHE;
uint64_t prod_tail_1;
uint64_t prod_head;
uint64_t prod_tail;
uint64_t prod_drop;
uint64_t prod_max_gap;
struct nm_pcap_file *pcap;
struct nmport_d *src_port;
const char * prod_ifname;
struct netmap_ring *rxring;
uint32_t si;
int burst;
uint32_t rx_qmax;
uint64_t qt_qout;
uint64_t qt_tx;
const char * cur_pkt;
uint32_t cur_len;
uint32_t cur_caplen;
int cur_drop;
uint64_t cur_tt;
uint64_t cur_delay;
const char * cons_ifname;
uint64_t rx ALIGN_CACHE;
uint64_t cons_head;
uint64_t cons_tail;
uint64_t cons_now;
uint64_t rx_wait;
volatile uint64_t _tail ALIGN_CACHE ;
volatile uint64_t _head ALIGN_CACHE ;
};
struct pipe_args {
int wait_link;
pthread_t cons_tid;
pthread_t prod_tid;
int cons_core;
int prod_core;
struct nmport_d *pa;
struct nmport_d *pb;
struct _qs q;
};
#define NS_IN_S (1000000000ULL)
#define TIME_UNITS NS_IN_S
static int
setaffinity(int i)
{
cpuset_t cpumask;
struct sched_param p;
if (i == -1)
return 0;
CPU_ZERO(&cpumask);
CPU_SET(i, &cpumask);
if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
WWW("Unable to set affinity: %s", strerror(errno));
}
if (setpriority(PRIO_PROCESS, 0, -10)) {;
WWW("Unable to set priority: %s", strerror(errno));
}
bzero(&p, sizeof(p));
p.sched_priority = 10;
if (sched_setscheduler(0, SCHED_RR, &p)) {
WWW("Unable to set scheduler: %s", strerror(errno));
}
return 0;
}
static inline void
set_tns_now(uint64_t *now, uint64_t t0)
{
struct timespec t;
clock_gettime(CLOCK_REALTIME, &t);
*now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
*now -= t0;
}
static inline int64_t
ts_cmp(uint64_t a, uint64_t b)
{
return (int64_t)(a - b);
}
static inline struct q_pkt *
pkt_at(struct _qs *q, uint64_t ofs)
{
return (struct q_pkt *)(q->buf + ofs);
}
static inline int
enq(struct _qs *q)
{
struct q_pkt *p = pkt_at(q, q->prod_tail);
nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
p->pktlen = q->cur_len;
p->pt_qout = q->qt_qout;
p->pt_tx = q->qt_tx;
p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
q->cur_len, (int)q->prod_tail, p->next,
1e-9*p->pt_qout, 1e-9*p->pt_tx);
q->prod_tail = p->next;
q->tx++;
return 0;
}
static int
null_run_fn(struct _qs *q, struct _cfg *cfg)
{
(void)q;
(void)cfg;
return 0;
}
static void *
pcap_prod(void *_pa)
{
struct pipe_args *pa = _pa;
struct _qs *q = &pa->q;
struct nm_pcap_file *pf = q->pcap;
uint32_t loops, i, tot_pkts;
uint64_t need;
uint64_t t_tx, tt, last_ts;
loops = (1 + 10000 / pf->tot_pkt);
tot_pkts = loops * pf->tot_pkt;
need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
q->buf = calloc(1, need);
if (q->buf == NULL) {
D("alloc %lld bytes for queue failed, exiting",(long long)need);
goto fail;
}
q->prod_head = q->prod_tail = 0;
q->buflen = need;
pf->cur = pf->data + sizeof(struct pcap_file_header);
pf->err = 0;
ED("--- start create %lu packets at tail %d",
(u_long)tot_pkts, (int)q->prod_tail);
last_ts = pf->first_ts;
q->qt_qout = 0;
for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
const char *next_pkt;
uint64_t cur_ts;
cur_ts = read_next_info(pf, 4) * NS_SCALE +
read_next_info(pf, 4) * pf->resolution;
q->cur_caplen = read_next_info(pf, 4);
q->cur_len = read_next_info(pf, 4);
next_pkt = pf->cur + q->cur_caplen;
q->cur_pkt = pf->cur;
q->cur_tt = cur_ts - last_ts;
if ((i % pf->tot_pkt) == 0)
ED("insert %5d len %lu cur_tt %.6f",
i, (u_long)q->cur_len, 1e-9*q->cur_tt);
pf->cur = next_pkt;
last_ts = cur_ts;
if (next_pkt == pf->lim) {
pf->cur = pf->data + sizeof(struct pcap_file_header);
last_ts = pf->first_ts;
loops++;
}
q->c_loss.run(q, &q->c_loss);
if (q->cur_drop)
continue;
q->c_bw.run(q, &q->c_bw);
tt = q->cur_tt;
q->qt_qout += tt;
#if 0
if (drop_after(q))
continue;
#endif
q->c_delay.run(q, &q->c_delay);
t_tx = q->qt_qout + q->cur_delay;
ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
enq(q);
q->tx++;
ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
}
ED("done q->prod_tail:%d",(int)q->prod_tail);
q->_tail = q->prod_tail;
return NULL;
fail:
if (q->buf != NULL) {
free(q->buf);
}
nmport_close(pa->pb);
return (NULL);
}
static void *
cons(void *_pa)
{
struct pipe_args *pa = _pa;
struct _qs *q = &pa->q;
int pending = 0;
uint64_t last_ts = 0;
set_tns_now(&q->t0, 0);
set_tns_now(&q->cons_now, q->t0);
q->cons_head = q->_head;
q->cons_tail = q->_tail;
while (!do_abort) {
struct q_pkt *p = pkt_at(q, q->cons_head);
__builtin_prefetch (q->buf + p->next);
if (q->cons_head == q->cons_tail) {
ND("Transmission restarted");
q->t0 += last_ts;
set_tns_now(&q->cons_now, q->t0);
q->cons_head = 0;
continue;
}
last_ts = p->pt_tx;
if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
q->rx_wait++;
ioctl(pa->pb->fd, NIOCTXSYNC, 0);
pending = 0;
usleep(20);
set_tns_now(&q->cons_now, q->t0);
continue;
}
if (nmport_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
(int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
(u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
ioctl(pa->pb->fd, NIOCTXSYNC, 0);
pending = 0;
continue;
}
pending++;
if (pending > q->burst) {
ioctl(pa->pb->fd, NIOCTXSYNC, 0);
pending = 0;
}
q->cons_head = p->next;
q->rx++;
}
D("exiting on abort");
return NULL;
}
static void *
nmreplay_main(void *_a)
{
struct pipe_args *a = _a;
struct _qs *q = &a->q;
const char *cap_fname = q->prod_ifname;
setaffinity(a->cons_core);
set_tns_now(&q->t0, 0);
if (cap_fname == NULL) {
goto fail;
}
q->pcap = readpcap(cap_fname);
if (q->pcap == NULL) {
EEE("unable to read file %s", cap_fname);
goto fail;
}
pcap_prod((void*)a);
destroy_pcap(q->pcap);
q->pcap = NULL;
a->pb = nmport_open(q->cons_ifname);
if (a->pb == NULL) {
EEE("cannot open netmap on %s", q->cons_ifname);
do_abort = 1;
return NULL;
}
WWW("prepare to send packets");
usleep(1000);
cons((void*)a);
EEE("exiting on abort");
fail:
if (q->pcap != NULL) {
destroy_pcap(q->pcap);
}
do_abort = 1;
return NULL;
}
static void
sigint_h(int sig)
{
(void)sig;
do_abort = 1;
signal(SIGINT, SIG_DFL);
}
static void
usage(void)
{
fprintf(stderr,
"usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
"\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
exit(1);
}
static char **
split_arg(const char *src, int *_ac)
{
char *my = NULL, **av = NULL;
const char *seps = " \t\r\n,";
int l, i, ac;
if (!src)
return NULL;
l = strlen(src);
for (;;) {
i = ac = 0;
ND("start pass %d: <%s>", av ? 1 : 0, my);
while (i < l) {
while (i <l && strchr(seps, src[i]))
i++;
if (i >= l)
break;
ND(" pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
if (av)
av[ac] = my+i;
ac++;
while (i <l && !strchr(seps, src[i])) i++;
if (av)
my[i] = '\0';
}
if (!av) {
ND("ac is %d", ac);
av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
my = (char *)&(av[ac+2]);
strcpy(my, src);
} else {
break;
}
}
for (i = 0; i < ac; i++) {
NED("%d: <%s>", i, av[i]);
}
av[i++] = NULL;
av[i++] = my;
*_ac = ac;
return av;
}
static int
cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
{
int ac = 0;
char **av;
int i;
if (arg == NULL || *arg == '\0')
return 1;
if (a == NULL || dst == NULL) {
ED("program error - invalid arguments");
exit(1);
}
av = split_arg(arg, &ac);
if (av == NULL)
return 1;
for (i = 0; a[i].parse; i++) {
struct _cfg x = a[i];
const char *errmsg = x.optarg;
int ret;
x.arg = NULL;
x.arg_len = 0;
bzero(&x.d, sizeof(x.d));
ND("apply %s to %s", av[0], errmsg);
ret = x.parse(q, &x, ac, av);
if (ret == 2)
continue;
if (ret == 1) {
ED("invalid arguments: need '%s' have '%s'",
errmsg, arg);
break;
}
x.optarg = arg;
*dst = x;
return 0;
}
ED("arguments %s not recognised", arg);
free(av);
return 1;
}
static struct _cfg delay_cfg[];
static struct _cfg bw_cfg[];
static struct _cfg loss_cfg[];
static uint64_t parse_bw(const char *arg);
static void
add_to(const char ** v, int l, const char *arg, const char *msg)
{
for (; l > 0 && *v != NULL ; l--, v++);
if (l == 0) {
ED("%s %s", msg, arg);
exit(1);
}
*v = arg;
}
int
main(int argc, char **argv)
{
int ch, i, err=0;
#define N_OPTS 1
struct pipe_args bp[N_OPTS];
const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
const char *pcap_file[N_OPTS];
int cores[4] = { 2, 8, 4, 10 };
bzero(&bp, sizeof(bp));
bzero(d, sizeof(d));
bzero(b, sizeof(b));
bzero(l, sizeof(l));
bzero(q, sizeof(q));
bzero(m, sizeof(m));
bzero(ifname, sizeof(ifname));
bzero(pcap_file, sizeof(pcap_file));
for (i = 0; i < N_OPTS; i++) {
struct _qs *qs = &bp[i].q;
qs->burst = 128;
qs->c_delay.optarg = "0";
qs->c_delay.run = null_run_fn;
qs->c_loss.optarg = "0";
qs->c_loss.run = null_run_fn;
qs->c_bw.optarg = "0";
qs->c_bw.run = null_run_fn;
}
while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
switch (ch) {
default:
D("bad option %c %s", ch, optarg);
usage();
break;
case 'C':
{
int ac = 0;
char **av = split_arg(optarg, &ac);
if (ac == 1) {
cores[0] = atoi(av[0]);
cores[1] = cores[0] + 1;
cores[2] = cores[1] + 1;
cores[3] = cores[2] + 1;
} else if (ac == 2) {
cores[0] = atoi(av[0]);
cores[1] = cores[0] + 1;
cores[2] = atoi(av[1]);
cores[3] = cores[2] + 1;
} else if (ac == 4) {
cores[0] = atoi(av[0]);
cores[1] = atoi(av[1]);
cores[2] = atoi(av[2]);
cores[3] = atoi(av[3]);
} else {
ED(" -C accepts 1, 2 or 4 comma separated arguments");
usage();
}
if (av)
free(av);
}
break;
case 'B':
add_to(b, N_OPTS, optarg, "-B too many times");
break;
case 'D':
add_to(d, N_OPTS, optarg, "-D too many times");
break;
case 'L':
add_to(l, N_OPTS, optarg, "-L too many times");
break;
case 'b':
bp[0].q.burst = atoi(optarg);
break;
case 'f':
add_to(pcap_file, N_OPTS, optarg, "-f too many times");
break;
case 'i':
add_to(ifname, N_OPTS, optarg, "-i too many times");
break;
case 'v':
verbose++;
break;
case 'w':
bp[0].wait_link = atoi(optarg);
break;
}
}
argc -= optind;
argv += optind;
if (!pcap_file[0]) {
ED("missing pcap file");
usage();
}
if (!ifname[0]) {
ED("missing interface");
usage();
}
if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
WWW("invalid burst %d, set to 1024", bp[0].q.burst);
bp[0].q.burst = 1024;
}
if (bp[0].wait_link > 100) {
ED("invalid wait_link %d, set to 4", bp[0].wait_link);
bp[0].wait_link = 4;
}
bp[0].q.prod_ifname = pcap_file[0];
bp[0].q.cons_ifname = ifname[0];
bp[0].cons_core = cores[0];
bp[0].prod_core = cores[1];
ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
for (i = 0; i < N_OPTS; i++) {
struct _qs *qs = &bp[i].q;
err += cmd_apply(delay_cfg, d[i], qs, &qs->c_delay);
err += cmd_apply(bw_cfg, b[i], qs, &qs->c_bw);
err += cmd_apply(loss_cfg, l[i], qs, &qs->c_loss);
if (err != 0)
exit(1);
}
pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
signal(SIGINT, sigint_h);
sleep(1);
while (!do_abort) {
struct _qs olda = bp[0].q;
struct _qs *q0 = &bp[0].q;
sleep(1);
ED("%lld -> %lld maxq %d round %lld",
(long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
q0->rx_qmax, (long long)q0->prod_max_gap
);
ED("plr nominal %le actual %le",
(double)(q0->c_loss.d[0])/(1<<24),
q0->c_loss.d[1] == 0 ? 0 :
(double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8;
bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8;
}
D("exiting on abort");
sleep(1);
return (0);
}
struct _sm {
const char *s;
double m;
};
static double
parse_gen(const char *arg, const struct _sm *conv, int *err)
{
double d;
char *ep;
int dummy;
if (err == NULL)
err = &dummy;
*err = 0;
if (arg == NULL)
goto error;
d = strtod(arg, &ep);
if (ep == arg) {
ED("bad argument %s", arg);
goto error;
}
if (conv == NULL && *ep == '\0')
goto done;
ND("checking %s [%s]", arg, ep);
for (;conv->s; conv++) {
if (strchr(conv->s, *ep))
goto done;
}
error:
*err = 1;
return 0;
done:
if (conv) {
ND("scale is %s %lf", conv->s, conv->m);
d *= conv->m;
}
ND("returning %lf", d);
return d;
}
#define U_PARSE_ERR ~(0ULL)
static uint64_t
parse_time(const char *arg)
{
struct _sm a[] = {
{"", 1000000000 },
{"n", 1 }, {"u", 1000 },
{"m", 1000000 }, {"s", 1000000000 },
{NULL, 0 }
};
int err;
uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
return err ? U_PARSE_ERR : ret;
}
static uint64_t
parse_bw(const char *arg)
{
struct _sm a[] = {
{"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
};
int err;
uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
return err ? U_PARSE_ERR : ret;
}
static inline uint64_t
my_random24(void)
{
return random() & ((1<<24) - 1);
}
#if 0
Here we place the functions to implement the various features
of the system. For each feature one should define a struct _cfg
(see at the beginning for definition) that refers a *_parse() function
to extract values from the command line, and a *_run() function
that is invoked on each packet to implement the desired function.
Examples of the two functions are below. In general:
- the *_parse() function takes argc/argv[], matches the function
name in argv[0], extracts the operating parameters, allocates memory
if needed, and stores them in the struct _cfg.
Return value is 2 if argv[0] is not recosnised, 1 if there is an
error in the arguments, 0 if all ok.
On the command line, argv[] is a single, comma separated argument
that follow the specific option eg -D constant,20ms
struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
function can use that without having to allocate memory.
- the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
*q contains all the informatio that may be possibly needed, including
those on the packet currently under processing.
The basic values are the following:
char * cur_pkt points to the current packet (linear buffer)
uint32_t cur_len; length of the current packet
the functions are not supposed to modify these values
int cur_drop; true if current packet must be dropped.
Must be set to non-zero by the loss emulation function
uint64_t cur_delay; delay in nanoseconds for the current packet
Must be set by the delay emulation function
More sophisticated functions may need to access other fields in *q,
see the structure description for that.
When implementing a new function for a feature (e.g. for delay,
bandwidth, loss...) the struct _cfg should be added to the array
that contains all possible options.
--- Specific notes ---
DELAY emulation -D option_arguments
If the option is not supplied, the system applies 0 extra delay
The resolution for times is 1ns, the precision is load dependent and
generally in the order of 20-50us.
Times are in nanoseconds, can be followed by a character specifying
a different unit e.g.
n nanoseconds
u microseconds
m milliseconds
s seconds
Currently implemented options:
constant,t constant delay equal to t
uniform,tmin,tmax uniform delay between tmin and tmax
exp,tavg,tmin exponential distribution with average tavg
and minimum tmin (corresponds to an exponential
distribution with argument 1/(tavg-tmin) )
LOSS emulation -L option_arguments
Loss is expressed as packet or bit error rate, which is an absolute
number between 0 and 1 (typically small).
Currently implemented options
plr,p uniform packet loss rate p, independent
of packet size
burst,p,lmin,lmax burst loss with burst probability p and
burst length uniformly distributed between
lmin and lmax
ber,p uniformly distributed bit error rate p,
so actual loss prob. depends on size.
BANDWIDTH emulation -B option_arguments
Bandwidths are expressed in bits per second, can be followed by a
character specifying a different unit e.g.
b/B bits per second
k/K kbits/s (10^3 bits/s)
m/M mbits/s (10^6 bits/s)
g/G gbits/s (10^9 bits/s)
Currently implemented options
const,b constant bw, excluding mac framing
ether,b constant bw, including ethernet framing
(20 bytes framing + 4 bytes crc)
real,[scale] use real time, optionally with a scaling factor
#endif
static int
const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
uint64_t delay;
(void)q;
if (strncmp(av[0], "const", 5) != 0 && ac > 1)
return 2;
if (ac > 2)
return 1;
delay = parse_time(av[ac - 1]);
if (delay == U_PARSE_ERR)
return 1;
dst->d[0] = delay;
return 0;
}
static int
const_delay_run(struct _qs *q, struct _cfg *arg)
{
q->cur_delay = arg->d[0];
return 0;
}
static int
uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
uint64_t dmin, dmax;
(void)q;
if (strcmp(av[0], "uniform") != 0)
return 2;
if (ac != 3)
return 1;
dmin = parse_time(av[1]);
dmax = parse_time(av[2]);
if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
return 1;
D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
dst->d[0] = dmin;
dst->d[1] = dmax;
dst->d[2] = dmax - dmin;
return 0;
}
static int
uniform_delay_run(struct _qs *q, struct _cfg *arg)
{
uint64_t x = my_random24();
q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
return 0;
}
static int
exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
#define PTS_D_EXP 512
uint64_t i, d_av, d_min, *t;
(void)q;
if (strcmp(av[0], "exp") != 0)
return 2;
if (ac != 3)
return 1;
d_av = parse_time(av[1]);
d_min = parse_time(av[2]);
if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
return 1;
d_av -= d_min;
dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
dst->arg = calloc(1, dst->arg_len);
if (dst->arg == NULL)
return 1;
t = (uint64_t *)dst->arg;
for (i = 0; i < PTS_D_EXP; i++) {
double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
t[i] = (uint64_t)d;
ND(5, "%ld: %le", i, d);
}
return 0;
}
static int
exp_delay_run(struct _qs *q, struct _cfg *arg)
{
uint64_t *t = (uint64_t *)arg->arg;
q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
RD(5, "delay %llu", (unsigned long long)q->cur_delay);
return 0;
}
#define TLEM_CFG_END NULL, 0, {0}, {0}
static struct _cfg delay_cfg[] = {
{ const_delay_parse, const_delay_run,
"constant,delay", TLEM_CFG_END },
{ uniform_delay_parse, uniform_delay_run,
"uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
{ exp_delay_parse, exp_delay_run,
"exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
{ NULL, NULL, NULL, TLEM_CFG_END }
};
static int
const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
uint64_t bw;
(void)q;
if (strncmp(av[0], "const", 5) != 0 && ac > 1)
return 2;
if (ac > 2)
return 1;
bw = parse_bw(av[ac - 1]);
if (bw == U_PARSE_ERR) {
return (ac == 2) ? 1 : 2 ;
}
dst->d[0] = bw;
return 0;
}
static int
const_bw_run(struct _qs *q, struct _cfg *arg)
{
uint64_t bps = arg->d[0];
q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
return 0;
}
static int
ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
uint64_t bw;
(void)q;
if (strcmp(av[0], "ether") != 0)
return 2;
if (ac != 2)
return 1;
bw = parse_bw(av[ac - 1]);
if (bw == U_PARSE_ERR)
return 1;
dst->d[0] = bw;
return 0;
}
static int
ether_bw_run(struct _qs *q, struct _cfg *arg)
{
uint64_t bps = arg->d[0];
q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
return 0;
}
static int
real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
double scale;
(void)q;
if (strcmp(av[0], "real") != 0)
return 2;
if (ac > 2) {
return 1;
} else if (ac == 1) {
scale = 1;
} else {
int err = 0;
scale = parse_gen(av[ac-1], NULL, &err);
if (err || scale <= 0 || scale > 1000)
return 1;
}
ED("real -> scale is %.6f", scale);
dst->f[0] = scale;
return 0;
}
static int
real_bw_run(struct _qs *q, struct _cfg *arg)
{
q->cur_tt /= arg->f[0];
return 0;
}
static struct _cfg bw_cfg[] = {
{ const_bw_parse, const_bw_run,
"constant,bps", TLEM_CFG_END },
{ ether_bw_parse, ether_bw_run,
"ether,bps", TLEM_CFG_END },
{ real_bw_parse, real_bw_run,
"real,scale", TLEM_CFG_END },
{ NULL, NULL, NULL, TLEM_CFG_END }
};
static int
const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
double plr;
int err;
(void)q;
if (strcmp(av[0], "plr") != 0 && ac > 1)
return 2;
if (ac > 2)
return 1;
plr = parse_gen(av[ac-1], NULL, &err);
if (err || plr < 0 || plr > 1)
return 1;
dst->d[0] = plr * (1<<24);
if (plr != 0 && dst->d[0] == 0)
ED("WWW warning, rounding %le down to 0", plr);
return 0;
}
static int
const_plr_run(struct _qs *q, struct _cfg *arg)
{
(void)arg;
uint64_t r = my_random24();
q->cur_drop = r < arg->d[0];
#if 1
arg->d[1]++;
arg->d[2] += q->cur_drop;
#endif
return 0;
}
static int
const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
{
double ber, ber8, cur;
int i, err;
uint32_t *plr;
const uint32_t mask = (1<<24) - 1;
(void)q;
if (strcmp(av[0], "ber") != 0)
return 2;
if (ac != 2)
return 1;
ber = parse_gen(av[ac-1], NULL, &err);
if (err || ber < 0 || ber > 1)
return 1;
dst->arg_len = MAX_PKT * sizeof(uint32_t);
plr = calloc(1, dst->arg_len);
if (plr == NULL)
return 1;
dst->arg = plr;
ber8 = 1 - ber;
ber8 *= ber8;
ber8 *= ber8;
ber8 *= ber8;
cur = 1;
for (i=0; i < MAX_PKT; i++, cur *= ber8) {
plr[i] = (mask + 1)*(1 - cur);
if (plr[i] > mask)
plr[i] = mask;
#if 0
if (i>= 60)
RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
#endif
}
dst->d[0] = ber * (mask + 1);
return 0;
}
static int
const_ber_run(struct _qs *q, struct _cfg *arg)
{
int l = q->cur_len;
uint64_t r = my_random24();
uint32_t *plr = arg->arg;
if (l >= MAX_PKT) {
RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
l = MAX_PKT-1;
}
q->cur_drop = r < plr[l];
#if 1
arg->d[1] += l * 8;
arg->d[2] += q->cur_drop;
#endif
return 0;
}
static struct _cfg loss_cfg[] = {
{ const_plr_parse, const_plr_run,
"plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
{ const_ber_parse, const_ber_run,
"ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
{ NULL, NULL, NULL, TLEM_CFG_END }
};