#include <sys/mman.h>
#include <sys/stat.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <openssl/md4.h>
#include "extern.h"
#define OBUF_SIZE (1024 * 1024)
enum downloadst {
DOWNLOAD_READ_NEXT = 0,
DOWNLOAD_READ_LOCAL,
DOWNLOAD_READ_REMOTE
};
struct download {
enum downloadst state;
size_t idx;
struct blkset blk;
void *map;
size_t mapsz;
int ofd;
int fd;
char *fname;
MD4_CTX ctx;
off_t downloaded;
off_t total;
const struct flist *fl;
size_t flsz;
int rootfd;
int fdin;
char *obuf;
size_t obufsz;
size_t obufmax;
};
static void
log_file(struct sess *sess,
const struct download *dl, const struct flist *f)
{
float frac, tot = dl->total;
int prec = 0;
const char *unit = "B";
if (sess->opts->server)
return;
frac = (dl->total == 0) ? 100.0 :
100.0 * dl->downloaded / dl->total;
if (dl->total > 1024 * 1024 * 1024) {
tot = dl->total / (1024. * 1024. * 1024.);
prec = 3;
unit = "GB";
} else if (dl->total > 1024 * 1024) {
tot = dl->total / (1024. * 1024.);
prec = 2;
unit = "MB";
} else if (dl->total > 1024) {
tot = dl->total / 1024.;
prec = 1;
unit = "KB";
}
LOG1("%s (%.*f %s, %.1f%% downloaded)",
f->path, prec, tot, unit, frac);
}
static void
download_reinit(struct sess *sess, struct download *p, size_t idx)
{
int32_t seed = htole32(sess->seed);
assert(p->state == DOWNLOAD_READ_NEXT);
p->idx = idx;
memset(&p->blk, 0, sizeof(struct blkset));
p->map = MAP_FAILED;
p->mapsz = 0;
p->ofd = -1;
p->fd = -1;
p->fname = NULL;
MD4_Init(&p->ctx);
p->downloaded = p->total = 0;
MD4_Update(&p->ctx, &seed, sizeof(int32_t));
}
static void
download_cleanup(struct download *p, int cleanup)
{
if (p->map != MAP_FAILED) {
assert(p->mapsz);
munmap(p->map, p->mapsz);
p->map = MAP_FAILED;
p->mapsz = 0;
}
if (p->ofd != -1) {
close(p->ofd);
p->ofd = -1;
}
if (p->fd != -1) {
close(p->fd);
if (cleanup && p->fname != NULL)
unlinkat(p->rootfd, p->fname, 0);
p->fd = -1;
}
free(p->fname);
p->fname = NULL;
p->state = DOWNLOAD_READ_NEXT;
}
struct download *
download_alloc(struct sess *sess, int fdin,
const struct flist *fl, size_t flsz, int rootfd)
{
struct download *p;
if ((p = malloc(sizeof(struct download))) == NULL) {
ERR("malloc");
return NULL;
}
p->state = DOWNLOAD_READ_NEXT;
p->fl = fl;
p->flsz = flsz;
p->rootfd = rootfd;
p->fdin = fdin;
download_reinit(sess, p, 0);
p->obufsz = 0;
p->obuf = NULL;
p->obufmax = OBUF_SIZE;
if (p->obufmax && (p->obuf = malloc(p->obufmax)) == NULL) {
ERR("malloc");
free(p);
return NULL;
}
return p;
}
void
download_free(struct download *p)
{
if (p == NULL)
return;
download_cleanup(p, 1);
free(p->obuf);
free(p);
}
static int
buf_copy(const char *buf, size_t sz, struct download *p)
{
size_t rem, tocopy;
ssize_t ssz;
assert(p->obufsz <= p->obufmax);
if (sz && p->obufsz < p->obufmax) {
assert(p->obuf != NULL);
rem = p->obufmax - p->obufsz;
assert(rem > 0);
tocopy = rem < sz ? rem : sz;
memcpy(p->obuf + p->obufsz, buf, tocopy);
sz -= tocopy;
buf += tocopy;
p->obufsz += tocopy;
assert(p->obufsz <= p->obufmax);
if (sz == 0)
return 1;
}
if (p->obufsz) {
assert(p->obufmax);
assert(p->obufsz <= p->obufmax);
assert(p->obuf != NULL);
if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) {
ERR("%s: write", p->fname);
return 0;
} else if ((size_t)ssz != p->obufsz) {
ERRX("%s: short write", p->fname);
return 0;
}
p->obufsz = 0;
}
if (sz) {
if ((ssz = write(p->fd, buf, sz)) < 0) {
ERR("%s: write", p->fname);
return 0;
} else if ((size_t)ssz != sz) {
ERRX("%s: short write", p->fname);
return 0;
}
}
return 1;
}
int
rsync_downloader(struct download *p, struct sess *sess, int *ofd)
{
int c;
int32_t idx, rawtok;
const struct flist *f;
size_t sz, tok;
struct stat st;
char *buf = NULL;
unsigned char ourmd[MD4_DIGEST_LENGTH],
md[MD4_DIGEST_LENGTH];
if (p->state == DOWNLOAD_READ_NEXT) {
if (!io_read_int(sess, p->fdin, &idx)) {
ERRX1("io_read_int");
return -1;
} else if (idx >= 0 && (size_t)idx >= p->flsz) {
ERRX("index out of bounds");
return -1;
} else if (idx < 0) {
LOG3("downloader: phase complete");
return 0;
}
if (sess->opts->dry_run)
return 1;
download_reinit(sess, p, idx);
if (!blk_send_ack(sess, p->fdin, &p->blk)) {
ERRX1("blk_send_ack");
goto out;
}
p->state = DOWNLOAD_READ_LOCAL;
f = &p->fl[idx];
p->ofd = openat(p->rootfd, f->path, O_RDONLY | O_NONBLOCK);
if (p->ofd == -1 && errno != ENOENT) {
ERR("%s: openat", f->path);
goto out;
} else if (p->ofd != -1) {
*ofd = p->ofd;
return 1;
}
}
f = &p->fl[p->idx];
if (p->state == DOWNLOAD_READ_LOCAL) {
assert(p->fname == NULL);
if (p->ofd != -1 &&
fstat(p->ofd, &st) == -1) {
ERR("%s: fstat", f->path);
goto out;
} else if (p->ofd != -1 && !S_ISREG(st.st_mode)) {
WARNX("%s: not regular", f->path);
goto out;
}
if (p->ofd != -1 && st.st_size > 0) {
p->mapsz = st.st_size;
p->map = mmap(NULL, p->mapsz,
PROT_READ, MAP_SHARED, p->ofd, 0);
if (p->map == MAP_FAILED) {
ERR("%s: mmap", f->path);
goto out;
}
}
*ofd = -1;
if (mktemplate(&p->fname, f->path, sess->opts->recursive) ==
-1) {
ERRX1("mktemplate");
goto out;
}
if ((p->fd = mkstempat(p->rootfd, p->fname)) == -1) {
ERR("mkstempat");
goto out;
}
LOG3("%s: temporary: %s", f->path, p->fname);
p->state = DOWNLOAD_READ_REMOTE;
return 1;
}
again:
assert(p->state == DOWNLOAD_READ_REMOTE);
assert(p->fname != NULL);
assert(p->fd != -1);
assert(p->fdin != -1);
if (!io_read_int(sess, p->fdin, &rawtok)) {
ERRX1("io_read_int");
goto out;
}
if (rawtok > 0) {
sz = rawtok;
if ((buf = malloc(sz)) == NULL) {
ERR("realloc");
goto out;
}
if (!io_read_buf(sess, p->fdin, buf, sz)) {
ERRX1("io_read_int");
goto out;
} else if (!buf_copy(buf, sz, p)) {
ERRX1("buf_copy");
goto out;
}
p->total += sz;
p->downloaded += sz;
LOG4("%s: received %zu B block", p->fname, sz);
MD4_Update(&p->ctx, buf, sz);
free(buf);
if ((c = io_read_check(p->fdin)) < 0) {
ERRX1("io_read_check");
goto out;
} else if (c > 0)
goto again;
return 1;
} else if (rawtok < 0) {
tok = -rawtok - 1;
if (tok >= p->blk.blksz) {
ERRX("%s: token not in block set: %zu (have %zu blocks)",
p->fname, tok, p->blk.blksz);
goto out;
}
sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len;
assert(sz);
assert(p->map != MAP_FAILED);
buf = p->map + (tok * p->blk.len);
assert(p->map != MAP_FAILED);
if (!buf_copy(buf, sz, p)) {
ERRX1("buf_copy");
goto out;
}
p->total += sz;
LOG4("%s: copied %zu B", p->fname, sz);
MD4_Update(&p->ctx, buf, sz);
if ((c = io_read_check(p->fdin)) < 0) {
ERRX1("io_read_check");
goto out;
} else if (c > 0)
goto again;
return 1;
}
if (!buf_copy(NULL, 0, p)) {
ERRX1("buf_copy");
goto out;
}
assert(rawtok == 0);
assert(p->obufsz == 0);
MD4_Final(ourmd, &p->ctx);
if (!io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) {
ERRX1("io_read_buf");
goto out;
} else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) {
ERRX("%s: hash does not match", p->fname);
goto out;
}
if (!rsync_set_metadata(sess, 1, p->fd, f, p->fname)) {
ERRX1("rsync_set_metadata");
goto out;
}
if (renameat(p->rootfd, p->fname, p->rootfd, f->path) == -1) {
ERR("%s: renameat: %s", p->fname, f->path);
goto out;
}
log_file(sess, p, f);
download_cleanup(p, 0);
return 1;
out:
download_cleanup(p, 1);
return -1;
}