#include <sys/queue.h>
#include <sys/stat.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <string.h>
#include <unistd.h>
#include <imsg.h>
#include <expat.h>
#include <openssl/sha.h>
#include "extern.h"
#include "rrdp.h"
#define MAX_SESSIONS 32
#define READ_BUF_SIZE (32 * 1024)
static struct msgbuf *msgq;
#define RRDP_STATE_REQ 0x01
#define RRDP_STATE_WAIT 0x02
#define RRDP_STATE_PARSE 0x04
#define RRDP_STATE_PARSE_ERROR 0x08
#define RRDP_STATE_PARSE_DONE 0x10
#define RRDP_STATE_HTTP_DONE 0x20
#define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
struct rrdp {
TAILQ_ENTRY(rrdp) entry;
unsigned int id;
char *notifyuri;
char *local;
char *last_mod;
struct pollfd *pfd;
int infd;
int state;
int aborted;
unsigned int file_pending;
unsigned int file_failed;
enum http_result res;
enum rrdp_task task;
char hash[SHA256_DIGEST_LENGTH];
SHA256_CTX ctx;
struct rrdp_session *repository;
struct rrdp_session *current;
XML_Parser parser;
struct notification_xml *nxml;
struct snapshot_xml *sxml;
struct delta_xml *dxml;
};
static TAILQ_HEAD(, rrdp) states = TAILQ_HEAD_INITIALIZER(states);
char *
xstrdup(const char *s)
{
char *r;
if ((r = strdup(s)) == NULL)
err(1, "strdup");
return r;
}
static void
rrdp_done(unsigned int id, int ok)
{
enum rrdp_msg type = RRDP_END;
struct ibuf *b;
b = io_new_buffer();
io_simple_buffer(b, &type, sizeof(type));
io_simple_buffer(b, &id, sizeof(id));
io_simple_buffer(b, &ok, sizeof(ok));
io_close_buffer(msgq, b);
}
static void
rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
{
enum rrdp_msg type = RRDP_HTTP_REQ;
struct ibuf *b;
b = io_new_buffer();
io_simple_buffer(b, &type, sizeof(type));
io_simple_buffer(b, &id, sizeof(id));
io_str_buffer(b, uri);
io_opt_str_buffer(b, last_mod);
io_close_buffer(msgq, b);
}
static void
rrdp_state_send(struct rrdp *s)
{
enum rrdp_msg type = RRDP_SESSION;
struct ibuf *b;
b = io_new_buffer();
io_simple_buffer(b, &type, sizeof(type));
io_simple_buffer(b, &s->id, sizeof(s->id));
rrdp_session_buffer(b, s->current);
io_close_buffer(msgq, b);
}
static void
rrdp_clear_repo(struct rrdp *s)
{
enum rrdp_msg type = RRDP_CLEAR;
struct ibuf *b;
b = io_new_buffer();
io_simple_buffer(b, &type, sizeof(type));
io_simple_buffer(b, &s->id, sizeof(s->id));
io_close_buffer(msgq, b);
}
void
rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
unsigned char *data, size_t datasz)
{
enum rrdp_msg type = RRDP_FILE;
struct ibuf *b;
if (s->file_failed == 0) {
b = io_new_buffer();
io_simple_buffer(b, &type, sizeof(type));
io_simple_buffer(b, &s->id, sizeof(s->id));
io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
if (pxml->type != PUB_ADD)
io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
io_str_buffer(b, pxml->uri);
io_buf_buffer(b, data, datasz);
io_close_buffer(msgq, b);
s->file_pending++;
}
}
static void
rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
{
struct rrdp *s;
if ((s = calloc(1, sizeof(*s))) == NULL)
err(1, NULL);
s->infd = -1;
s->id = id;
s->local = local;
s->notifyuri = notify;
s->repository = state;
if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
err(1, NULL);
s->state = RRDP_STATE_REQ;
if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
err(1, "XML_ParserCreate");
s->nxml = new_notification_xml(s->parser, s->repository, s->current,
notify);
TAILQ_INSERT_TAIL(&states, s, entry);
}
static void
rrdp_free(struct rrdp *s)
{
if (s == NULL)
return;
TAILQ_REMOVE(&states, s, entry);
free_notification_xml(s->nxml);
free_snapshot_xml(s->sxml);
free_delta_xml(s->dxml);
if (s->parser)
XML_ParserFree(s->parser);
if (s->infd != -1)
close(s->infd);
free(s->notifyuri);
free(s->local);
free(s->last_mod);
rrdp_session_free(s->repository);
rrdp_session_free(s->current);
free(s);
}
static struct rrdp *
rrdp_get(unsigned int id)
{
struct rrdp *s;
TAILQ_FOREACH(s, &states, entry)
if (s->id == id)
break;
return s;
}
static void
rrdp_failed(struct rrdp *s)
{
unsigned int id = s->id;
s->file_failed = 0;
if (s->task == DELTA && !s->aborted) {
free_delta_xml(s->dxml);
s->dxml = NULL;
rrdp_clear_repo(s);
s->sxml = new_snapshot_xml(s->parser, s->current, s);
s->task = SNAPSHOT;
s->state = RRDP_STATE_REQ;
logx("%s: delta sync failed, fallback to snapshot", s->local);
} else {
rrdp_free(s);
rrdp_done(id, 0);
}
}
static void
rrdp_finished(struct rrdp *s)
{
unsigned int id = s->id;
if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
return;
if (s->file_pending > 0)
return;
if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
rrdp_failed(s);
return;
}
if (s->res == HTTP_OK) {
XML_Parser p = s->parser;
if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
warnx("%s: XML error at line %llu: %s", s->local,
(unsigned long long)XML_GetCurrentLineNumber(p),
XML_ErrorString(XML_GetErrorCode(p)));
rrdp_failed(s);
return;
}
if (s->file_failed > 0) {
rrdp_failed(s);
return;
}
switch (s->task) {
case NOTIFICATION:
s->task = notification_done(s->nxml, s->last_mod);
s->last_mod = NULL;
switch (s->task) {
case NOTIFICATION:
logx("%s: repository not modified (%s#%lld)",
s->local, s->repository->session_id,
s->repository->serial);
rrdp_free(s);
rrdp_done(id, 1);
break;
case SNAPSHOT:
logx("%s: downloading snapshot (%s#%lld)",
s->local, s->current->session_id,
s->current->serial);
rrdp_clear_repo(s);
s->sxml = new_snapshot_xml(p, s->current, s);
s->state = RRDP_STATE_REQ;
break;
case DELTA:
logx("%s: downloading %lld deltas (%s#%lld)",
s->local,
s->repository->serial - s->current->serial,
s->current->session_id, s->current->serial);
s->dxml = new_delta_xml(p, s->current, s);
s->state = RRDP_STATE_REQ;
break;
}
break;
case SNAPSHOT:
rrdp_state_send(s);
rrdp_free(s);
rrdp_done(id, 1);
break;
case DELTA:
if (notification_delta_done(s->nxml)) {
rrdp_state_send(s);
rrdp_free(s);
rrdp_done(id, 1);
} else {
free_delta_xml(s->dxml);
s->dxml = new_delta_xml(p, s->current, s);
s->state = RRDP_STATE_REQ;
}
break;
}
} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
logx("%s: notification file not modified (%s#%lld)", s->local,
s->repository->session_id, s->repository->serial);
rrdp_free(s);
rrdp_done(id, 1);
} else {
rrdp_failed(s);
}
}
static void
rrdp_abort_req(struct rrdp *s)
{
unsigned int id = s->id;
s->aborted = 1;
if (s->state == RRDP_STATE_REQ) {
rrdp_free(s);
rrdp_done(id, 1);
return;
}
if (s->state == RRDP_STATE_WAIT)
return;
if (s->infd != -1) {
close(s->infd);
s->infd = -1;
s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
}
rrdp_finished(s);
}
static void
rrdp_input_handler(struct ibuf *b)
{
struct rrdp_session *state;
char *local, *notify, *last_mod;
struct rrdp *s;
enum rrdp_msg type;
enum http_result res;
unsigned int id;
int ok;
io_read_buf(b, &type, sizeof(type));
io_read_buf(b, &id, sizeof(id));
switch (type) {
case RRDP_START:
if (ibuf_fd_avail(b))
errx(1, "received unexpected fd");
io_read_str(b, &local);
io_read_str(b, ¬ify);
io_read_buf(b, &ok, sizeof(ok));
if (ok != 0) {
state = rrdp_session_read(b);
} else {
if ((state = calloc(1, sizeof(*state))) == NULL)
err(1, NULL);
}
rrdp_new(id, local, notify, state);
break;
case RRDP_HTTP_INI:
s = rrdp_get(id);
if (s == NULL)
errx(1, "http ini, rrdp session %u does not exist", id);
if (s->state != RRDP_STATE_WAIT)
errx(1, "%s: bad internal state", s->local);
s->infd = ibuf_fd_get(b);
if (s->infd == -1)
errx(1, "expected fd not received");
s->state = RRDP_STATE_PARSE;
if (s->aborted) {
rrdp_abort_req(s);
break;
}
break;
case RRDP_HTTP_FIN:
io_read_buf(b, &res, sizeof(res));
io_read_opt_str(b, &last_mod);
if (ibuf_fd_avail(b))
errx(1, "received unexpected fd");
s = rrdp_get(id);
if (s == NULL)
errx(1, "http fin, rrdp session %u does not exist", id);
if (!(s->state & RRDP_STATE_PARSE))
errx(1, "%s: bad internal state", s->local);
s->state |= RRDP_STATE_HTTP_DONE;
s->res = res;
free(s->last_mod);
s->last_mod = last_mod;
rrdp_finished(s);
break;
case RRDP_FILE:
s = rrdp_get(id);
if (s == NULL)
errx(1, "file, rrdp session %u does not exist", id);
if (ibuf_fd_avail(b))
errx(1, "received unexpected fd");
io_read_buf(b, &ok, sizeof(ok));
if (ok != 1)
s->file_failed++;
s->file_pending--;
if (s->file_pending == 0)
rrdp_finished(s);
break;
case RRDP_ABORT:
if (ibuf_fd_avail(b))
errx(1, "received unexpected fd");
s = rrdp_get(id);
if (s != NULL)
rrdp_abort_req(s);
break;
default:
errx(1, "unexpected message %d", type);
}
}
static void
rrdp_data_handler(struct rrdp *s)
{
char buf[READ_BUF_SIZE];
XML_Parser p = s->parser;
ssize_t len;
len = read(s->infd, buf, sizeof(buf));
if (len == -1) {
warn("%s: read failure", s->local);
rrdp_abort_req(s);
return;
}
if ((s->state & RRDP_STATE_PARSE) == 0)
errx(1, "%s: bad parser state", s->local);
if (len == 0) {
close(s->infd);
s->infd = -1;
if (s->task != NOTIFICATION) {
char h[SHA256_DIGEST_LENGTH];
SHA256_Final(h, &s->ctx);
if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
s->state |= RRDP_STATE_PARSE_ERROR;
warnx("%s: bad message digest", s->local);
}
}
s->state |= RRDP_STATE_PARSE_DONE;
rrdp_finished(s);
return;
}
if (s->task != NOTIFICATION)
SHA256_Update(&s->ctx, buf, len);
if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
warnx("%s: parse error at line %llu: %s", s->local,
(unsigned long long)XML_GetCurrentLineNumber(p),
XML_ErrorString(XML_GetErrorCode(p)));
s->state |= RRDP_STATE_PARSE_ERROR;
}
}
void
proc_rrdp(int fd)
{
struct pollfd pfds[MAX_SESSIONS + 1];
struct rrdp *s, *ns;
struct ibuf *b;
size_t i;
if (pledge("stdio recvfd", NULL) == -1)
err(1, "pledge");
if ((msgq = msgbuf_new_reader(sizeof(size_t), io_parse_hdr, NULL)) ==
NULL)
err(1, NULL);
for (;;) {
i = 1;
memset(&pfds, 0, sizeof(pfds));
TAILQ_FOREACH(s, &states, entry) {
if (i >= MAX_SESSIONS + 1) {
s->pfd = NULL;
continue;
}
if (s->state == RRDP_STATE_REQ) {
const char *uri;
switch (s->task) {
case NOTIFICATION:
rrdp_http_req(s->id, s->notifyuri,
s->repository->last_mod);
break;
case SNAPSHOT:
case DELTA:
uri = notification_get_next(s->nxml,
s->hash, sizeof(s->hash),
s->task);
SHA256_Init(&s->ctx);
rrdp_http_req(s->id, uri, NULL);
break;
}
s->state = RRDP_STATE_WAIT;
}
s->pfd = pfds + i++;
s->pfd->fd = s->infd;
s->pfd->events = POLLIN;
}
pfds[0].fd = fd;
pfds[0].events = POLLIN;
if (msgbuf_queuelen(msgq) > 0)
pfds[0].events |= POLLOUT;
if (poll(pfds, i, INFTIM) == -1) {
if (errno == EINTR)
continue;
err(1, "poll");
}
if (pfds[0].revents & POLLHUP)
break;
if (pfds[0].revents & POLLOUT) {
if (msgbuf_write(fd, msgq) == -1) {
if (errno == EPIPE)
errx(1, "write: connection closed");
else
err(1, "write");
}
}
if (pfds[0].revents & POLLIN) {
switch (msgbuf_read(fd, msgq)) {
case -1:
err(1, "msgbuf_read");
case 0:
errx(1, "msgbuf_read: connection closed");
}
while ((b = io_buf_get(msgq)) != NULL) {
rrdp_input_handler(b);
ibuf_free(b);
}
}
TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
if (s->pfd == NULL)
continue;
if (s->pfd->revents != 0)
rrdp_data_handler(s);
}
}
exit(0);
}