#include <sys/types.h>
#include <err.h>
#include <inttypes.h>
#include <md5.h>
#include <pthread.h>
#include <stdlib.h>
#include <strings.h>
#if defined(MKUZ_DEBUG)
# include <stdio.h>
#endif
#include "mkuz_conveyor.h"
#include "mkuz_cfg.h"
#include "mkuzip.h"
#include "mkuz_blk.h"
#include "mkuz_format.h"
#include "mkuz_fqueue.h"
#include "mkuz_blk_chain.h"
static void compute_digest(struct mkuz_blk *);
struct cw_args {
struct mkuz_conveyor *cvp;
struct mkuz_cfg *cfp;
};
static void *
cworker(void *p)
{
struct cw_args *cwp;
struct mkuz_cfg *cfp;
struct mkuz_blk *oblk, *iblk;
struct mkuz_conveyor *cvp;
void *c_ctx;
cwp = (struct cw_args *)p;
cfp = cwp->cfp;
cvp = cwp->cvp;
free(cwp);
c_ctx = cfp->handler->f_init(&cfp->comp_level);
for (;;) {
iblk = mkuz_fqueue_deq(cvp->wrk_queue);
if (iblk == MKUZ_BLK_EOF) {
mkuz_fqueue_enq(cvp->wrk_queue, iblk);
break;
}
if (cfp->no_zcomp == 0 &&
mkuz_memvcmp(iblk->data, '\0', iblk->info.len) != 0) {
oblk = mkuz_blk_ctor(0);
} else {
oblk = mkuz_blk_ctor(cfp->cbound_blksz);
cfp->handler->f_compress(c_ctx, iblk, oblk);
if (cfp->en_dedup != 0) {
compute_digest(oblk);
}
}
oblk->info.blkno = iblk->info.blkno;
mkuz_fqueue_enq(cvp->results, oblk);
free(iblk);
}
return (NULL);
}
static void
compute_digest(struct mkuz_blk *bp)
{
MD5_CTX mcontext;
MD5Init(&mcontext);
MD5Update(&mcontext, bp->data, bp->info.len);
MD5Final(bp->info.digest, &mcontext);
}
struct mkuz_conveyor *
mkuz_conveyor_ctor(struct mkuz_cfg *cfp)
{
struct mkuz_conveyor *cp;
struct cw_args *cwp;
int i, r;
cp = mkuz_safe_zmalloc(sizeof(struct mkuz_conveyor) +
(sizeof(pthread_t) * cfp->nworkers));
cp->wrk_queue = mkuz_fqueue_ctor(1);
cp->results = mkuz_fqueue_ctor(1);
for (i = 0; i < cfp->nworkers; i++) {
cwp = mkuz_safe_zmalloc(sizeof(struct cw_args));
cwp->cfp = cfp;
cwp->cvp = cp;
r = pthread_create(&cp->wthreads[i], NULL, cworker, (void *)cwp);
if (r != 0) {
errx(1, "mkuz_conveyor_ctor: pthread_create() failed");
}
}
return (cp);
}