#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include "pipe.h"
enum kqueue_mode {
KQUEUE_READ,
KQUEUE_READ_EOF,
KQUEUE_WRITE,
KQUEUE_WRITE_EOF,
};
struct context {
enum kqueue_mode c_mode;
int c_alive;
int c_pipe[2];
int c_kq;
char *c_buf;
size_t c_bufsiz;
pthread_t c_th;
pthread_mutex_t c_mtx;
};
static void ctx_setup(struct context *, enum kqueue_mode, int);
static void ctx_teardown(struct context *);
static int ctx_thread_alive(struct context *);
static void ctx_thread_start(struct context *);
static void ctx_lock(struct context *);
static void ctx_unlock(struct context *);
static void *kqueue_thread(void *);
int
test_kqueue_read(void)
{
struct context ctx;
ctx_setup(&ctx, KQUEUE_READ, O_NONBLOCK);
ctx_thread_start(&ctx);
while (ctx_thread_alive(&ctx)) {
ssize_t n;
n = write(ctx.c_pipe[1], &ctx.c_buf[0], 1);
if (n == -1) {
if (errno == EPIPE)
break;
if (errno == EAGAIN)
continue;
err(1, "write");
}
if (n != 1)
errx(1, "write: %ld != 1", n);
}
ctx_teardown(&ctx);
return 0;
}
int
test_kqueue_read_eof(void)
{
struct context ctx;
ctx_setup(&ctx, KQUEUE_READ_EOF, 0);
ctx_thread_start(&ctx);
while (ctx_thread_alive(&ctx)) {
if (ctx.c_pipe[1] == -1)
continue;
close(ctx.c_pipe[1]);
ctx.c_pipe[1] = -1;
}
ctx_teardown(&ctx);
return 0;
}
int
test_kqueue_write(void)
{
struct context ctx;
ssize_t n;
ctx_setup(&ctx, KQUEUE_WRITE, 0);
n = write(ctx.c_pipe[1], ctx.c_buf, ctx.c_bufsiz);
if (n == -1)
err(1, "write");
if ((size_t)n != ctx.c_bufsiz)
errx(1, "write: %ld != %zu", n, ctx.c_bufsiz);
ctx_thread_start(&ctx);
while (ctx_thread_alive(&ctx)) {
unsigned char c;
n = read(ctx.c_pipe[0], &c, 1);
if (n == -1)
err(1, "read");
if (n == 0)
break;
if (n != 1)
errx(1, "read: %ld != 1", n);
}
ctx_teardown(&ctx);
return 0;
}
int
test_kqueue_write_eof(void)
{
return 0;
}
static void
ctx_setup(struct context *ctx, enum kqueue_mode mode, int flags)
{
int error;
ctx->c_mode = mode;
ctx->c_alive = 1;
if (flags) {
if (pipe2(ctx->c_pipe, flags) == -1)
err(1, "pipe");
} else {
if (pipe(ctx->c_pipe) == -1)
err(1, "pipe");
}
ctx->c_kq = kqueue();
if (ctx->c_kq == -1)
err(1, "kqueue");
ctx->c_bufsiz = PIPE_SIZE;
ctx->c_buf = malloc(ctx->c_bufsiz);
if (ctx->c_buf == NULL)
err(1, NULL);
error = pthread_mutex_init(&ctx->c_mtx, NULL);
if (error)
errc(1, error, "pthread_mutex_init");
}
static void
ctx_teardown(struct context *ctx)
{
int error;
error = pthread_join(ctx->c_th, NULL);
if (error)
errc(1, error, "pthread_join");
error = pthread_mutex_destroy(&ctx->c_mtx);
if (error)
errc(1, error, "pthread_mutex_destroy");
free(ctx->c_buf);
close(ctx->c_pipe[0]);
close(ctx->c_pipe[1]);
close(ctx->c_kq);
}
static int
ctx_thread_alive(struct context *ctx)
{
int alive;
ctx_lock(ctx);
alive = ctx->c_alive;
ctx_unlock(ctx);
return alive;
}
static void
ctx_thread_start(struct context *ctx)
{
int error;
error = pthread_create(&ctx->c_th, NULL, kqueue_thread, ctx);
if (error)
errc(1, error, "pthread_create");
}
static void
ctx_lock(struct context *ctx)
{
int error;
error = pthread_mutex_lock(&ctx->c_mtx);
if (error)
errc(1, error, "pthread_mutex_lock");
}
static void
ctx_unlock(struct context *ctx)
{
int error;
error = pthread_mutex_unlock(&ctx->c_mtx);
if (error)
errc(1, error, "pthread_mutex_unlock");
}
static void *
kqueue_thread(void *arg)
{
struct context *ctx = arg;
struct kevent kev;
int fd, filter, nevents;
switch (ctx->c_mode) {
case KQUEUE_READ:
case KQUEUE_READ_EOF:
fd = ctx->c_pipe[0];
filter = EVFILT_READ;
break;
case KQUEUE_WRITE:
case KQUEUE_WRITE_EOF:
fd = ctx->c_pipe[1];
filter = EVFILT_WRITE;
break;
}
EV_SET(&kev, fd, filter, EV_ADD, 0, 0, NULL);
nevents = kevent(ctx->c_kq, &kev, 1, NULL, 0, NULL);
if (nevents == -1)
err(1, "kevent");
nevents = kevent(ctx->c_kq, NULL, 0, &kev, 1, NULL);
if (nevents == -1)
err(1, "kevent");
if (nevents != 1)
errx(1, "kevent: %d != 1", nevents);
if ((int)kev.ident != fd)
errx(1, "kevent: ident");
if (kev.filter != filter)
errx(1, "kevent: filter");
switch (ctx->c_mode) {
case KQUEUE_READ_EOF:
case KQUEUE_WRITE_EOF:
if ((kev.flags & EV_EOF) == 0)
errx(1, "kevent: eof");
break;
default:
break;
}
ctx_lock(ctx);
ctx->c_alive = 0;
ctx_unlock(ctx);
close(fd);
return NULL;
}