root/src/servers/launch/Worker.cpp
/*
 * Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
 * Distributed under the terms of the MIT License.
 */


#include "Worker.h"


static const bigtime_t kWorkerTimeout = 1000000;
        // One second until a worker thread quits without a job

static const int32 kWorkerCountPerCPU = 3;

static int32 sWorkerCount;


Worker::Worker(JobQueue& queue)
        :
        fThread(-1),
        fJobQueue(queue)
{
}


Worker::~Worker()
{
}


status_t
Worker::Init()
{
        fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY,
                this);
        if (fThread < 0)
                return fThread;

        status_t status = resume_thread(fThread);
        if (status == B_OK)
                atomic_add(&sWorkerCount, 1);

        return status;
}


status_t
Worker::Process()
{
        while (true) {
                BJob* job;
                status_t status = fJobQueue.Pop(Timeout(), false, &job);
                if (status != B_OK)
                        return status;

                status = Run(job);
                if (status != B_OK) {
                        // TODO: proper error reporting on failed job!
                        debug_printf("Launching %s failed: %s\n", job->Title().String(),
                                strerror(status));
                }
        }
}


bigtime_t
Worker::Timeout() const
{
        return kWorkerTimeout;
}


const char*
Worker::Name() const
{
        return "worker";
}


status_t
Worker::Run(BJob* job)
{
        return job->Run();
}


/*static*/ status_t
Worker::_Process(void* _self)
{
        Worker* self = (Worker*)_self;
        status_t status = self->Process();
        delete self;

        return status;
}


// #pragma mark -


MainWorker::MainWorker(JobQueue& queue)
        :
        Worker(queue),
        fMaxWorkerCount(kWorkerCountPerCPU)
{
        // TODO: keep track of workers, and quit them on destruction
        system_info info;
        if (get_system_info(&info) == B_OK)
                fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU;
}


bigtime_t
MainWorker::Timeout() const
{
        return B_INFINITE_TIMEOUT;
}


const char*
MainWorker::Name() const
{
        return "main worker";
}


status_t
MainWorker::Run(BJob* job)
{
        int32 count = atomic_get(&sWorkerCount);

        size_t jobCount = fJobQueue.CountJobs();
        if (jobCount > INT_MAX)
                jobCount = INT_MAX;

        if ((int32)jobCount > count && count < fMaxWorkerCount) {
                Worker* worker = new Worker(fJobQueue);
                worker->Init();
        }

        return Worker::Run(job);
}