root/src/kits/support/JobQueue.cpp
/*
 * Copyright 2011-2015, Haiku, Inc. All Rights Reserved.
 * Distributed under the terms of the MIT License.
 *
 * Authors:
 *              Axel Dörfler <axeld@pinc-software.de>
 *              Oliver Tappe <zooey@hirschkaefer.de>
 */


#include <JobQueue.h>

#include <set>

#include <Autolock.h>
#include <Job.h>

#include <JobPrivate.h>


namespace BSupportKit {

namespace BPrivate {


struct JobQueue::JobPriorityLess {
        bool operator()(const BJob* left, const BJob* right) const;
};


/*!     Sort jobs by:
                1. descending count of dependencies (only jobs without dependencies are
                   runnable)
                2. job ticket number (order in which jobs were added to the queue)
*/
bool
JobQueue::JobPriorityLess::operator()(const BJob* left, const BJob* right) const
{
        int32 difference = left->CountDependencies() - right->CountDependencies();
        if (difference < 0)
                return true;
        if (difference > 0)
                return false;

        return left->TicketNumber() < right->TicketNumber();
};


class JobQueue::JobPriorityQueue
        : public std::set<BJob*, JobPriorityLess> {
};


// #pragma mark -


JobQueue::JobQueue()
        :
        fLock("job queue"),
        fNextTicketNumber(1)
{
        fInitStatus = _Init();
}


JobQueue::~JobQueue()
{
        Close();
        delete fQueuedJobs;
}


status_t
JobQueue::InitCheck() const
{
        return fInitStatus;
}


status_t
JobQueue::AddJob(BJob* job)
{
        if (fQueuedJobs == NULL)
                return B_NO_INIT;

        BAutolock lock(&fLock);
        if (!lock.IsLocked())
                return B_ERROR;

        try {
                if (!fQueuedJobs->insert(job).second)
                        return B_NAME_IN_USE;
        } catch (const std::bad_alloc& e) {
                return B_NO_MEMORY;
        } catch (...) {
                return B_ERROR;
        }
        BJob::Private(*job).SetTicketNumber(fNextTicketNumber++);
        job->AddStateListener(this);
        if (job->IsRunnable())
                release_sem(fHaveRunnableJobSem);

        return B_OK;
}


status_t
JobQueue::RemoveJob(BJob* job)
{
        if (fQueuedJobs == NULL)
                return B_NO_INIT;

        BAutolock lock(&fLock);
        if (lock.IsLocked()) {
                try {
                        if (fQueuedJobs->erase(job) == 0)
                                return B_NAME_NOT_FOUND;
                } catch (...) {
                        return B_ERROR;
                }
                BJob::Private(*job).ClearTicketNumber();
                job->RemoveStateListener(this);
        }

        return B_OK;
}


void
JobQueue::JobSucceeded(BJob* job)
{
        BAutolock lock(&fLock);
        if (lock.IsLocked())
                _RequeueDependantJobsOf(job);
}


void
JobQueue::JobFailed(BJob* job)
{
        BAutolock lock(&fLock);
        if (lock.IsLocked())
                _RemoveDependantJobsOf(job);
}


BJob*
JobQueue::Pop()
{
        BJob* job;
        if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK)
                return job;

        return NULL;
}


status_t
JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job)
{
        BAutolock lock(&fLock);
        if (lock.IsLocked()) {
                while (true) {
                        JobPriorityQueue::iterator head = fQueuedJobs->begin();
                        if (head != fQueuedJobs->end()) {
                                if ((*head)->IsRunnable()) {
                                        *_job = *head;
                                        fQueuedJobs->erase(head);
                                        return B_OK;
                                }
                        } else if (returnWhenEmpty)
                                return B_ENTRY_NOT_FOUND;

                        // we need to wait until a job becomes available/runnable
                        status_t result;
                        do {
                                lock.Unlock();
                                result = acquire_sem_etc(fHaveRunnableJobSem, 1,
                                        B_RELATIVE_TIMEOUT, timeout);
                                if (!lock.Lock())
                                        return B_ERROR;
                        } while (result == B_INTERRUPTED);
                        if (result != B_OK)
                                return result;
                }
        }

        return B_ERROR;
}


size_t
JobQueue::CountJobs() const
{
        BAutolock locker(fLock);
        return fQueuedJobs->size();
}


void
JobQueue::Close()
{
        if (fHaveRunnableJobSem < 0)
                return;

        BAutolock lock(&fLock);
        if (lock.IsLocked()) {
                delete_sem(fHaveRunnableJobSem);
                fHaveRunnableJobSem = -1;

                if (fQueuedJobs != NULL) {
                        // get rid of all jobs
                        for (JobPriorityQueue::iterator iter = fQueuedJobs->begin();
                                iter != fQueuedJobs->end(); ++iter) {
                                delete (*iter);
                        }
                        fQueuedJobs->clear();
                }
        }
}


status_t
JobQueue::_Init()
{
        status_t result = fLock.InitCheck();
        if (result != B_OK)
                return result;

        fQueuedJobs = new (std::nothrow) JobPriorityQueue();
        if (fQueuedJobs == NULL)
                return B_NO_MEMORY;

        fHaveRunnableJobSem = create_sem(0, "have runnable job");
        if (fHaveRunnableJobSem < 0)
                return fHaveRunnableJobSem;

        return B_OK;
}


void
JobQueue::_RequeueDependantJobsOf(BJob* job)
{
        while (BJob* dependantJob = job->DependantJobAt(0)) {
                JobPriorityQueue::iterator found = fQueuedJobs->find(dependantJob);
                bool removed = false;
                if (found != fQueuedJobs->end()) {
                        try {
                                fQueuedJobs->erase(dependantJob);
                                removed = true;
                        } catch (...) {
                        }
                }
                dependantJob->RemoveDependency(job);
                if (removed) {
                        // Only insert a job if it was in our queue before
                        try {
                                fQueuedJobs->insert(dependantJob);
                                if (dependantJob->IsRunnable())
                                        release_sem(fHaveRunnableJobSem);
                        } catch (...) {
                        }
                }
        }
}


void
JobQueue::_RemoveDependantJobsOf(BJob* job)
{
        while (BJob* dependantJob = job->DependantJobAt(0)) {
                try {
                        fQueuedJobs->erase(dependantJob);
                } catch (...) {
                }

                if (dependantJob->State() != B_JOB_STATE_ABORTED) {
                        BJob::Private(*dependantJob).SetState(B_JOB_STATE_ABORTED);
                        BJob::Private(*dependantJob).NotifyStateListeners();
                }

                _RemoveDependantJobsOf(dependantJob);
                dependantJob->RemoveDependency(job);
                // TODO: we need some sort of ownership management
                delete dependantJob;
        }
}


}       // namespace BPrivate

}       // namespace BPackageKit