root/src/servers/app/DelayedMessage.cpp
/*
 * Copyright 2015, Haiku.
 * Distributed under the terms of the MIT License.
 *
 * Authors:
 *                      Joseph Groover <looncraz@looncraz.net>
*/


#include "DelayedMessage.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <Autolock.h>
#include <String.h>

#include <LinkSender.h>
#include <ServerProtocol.h>


// DelayedMessageSender constants
static const int32 kWakeupMessage = AS_LAST_CODE + 2048;
static const int32 kExitMessage = kWakeupMessage + 1;

static const char* kName = "DMT is here for you, eventually...";
static int32 kPriority = B_URGENT_DISPLAY_PRIORITY;
static int32 kPortCapacity = 10;


//! Data attachment structure.
struct Attachment {
                                                                Attachment(const void* data, size_t size);
                                                                ~Attachment();

                        const void*                     constData;
                        void*                           data;
                        size_t                          size;
};


typedef BObjectList<Attachment, true> AttachmentList;


/*!     \class ScheduledMessage
        \brief Responsible for sending of delayed message.
*/
class ScheduledMessage {
public:
                                                                ScheduledMessage(DelayedMessage& message);
                                                                ~ScheduledMessage();

                        int32                           CountTargets() const;

                        void                            Finalize();
                        bigtime_t                       ScheduledTime() const;
                        int32                           SendMessage();
                        bool                            IsValid() const;
                        bool                            Merge(DelayedMessage& message);

                        status_t                        SendMessageToPort(port_id port);
                        bool                            operator<(const ScheduledMessage& other) const;

                        DelayedMessageData*     fData;
};


/*!     \class DelayedMessageSender DelayedMessageSender.h
        \brief Responsible for scheduling and sending of delayed messages
*/
class DelayedMessageSender {
public:
                        explicit                        DelayedMessageSender();
                                                                ~DelayedMessageSender();

                        status_t                        ScheduleMessage (DelayedMessage& message);

                        int32                           CountDelayedMessages() const;
                        int64                           CountSentMessages() const;

private:
                        void                            _MessageLoop();
                        int32                           _SendDelayedMessages();
        static  int32                           _thread_func(void* sender);
                        void                            _Wakeup(bigtime_t whatTime);

private:
        typedef BObjectList<ScheduledMessage, true> ScheduledList;

        mutable BLocker                         fLock;
                        ScheduledList           fMessages;

                        bigtime_t                       fScheduledWakeup;

                        int32                           fWakeupRetry;
                        thread_id                       fThread;
                        port_id                         fPort;

        mutable int64                           fSentCount;
};


DelayedMessageSender gDelayedMessageSender;


/*!     \class DelayedMessageData DelayedMessageSender.h
        \brief Owns DelayedMessage data, allocates memory and copies data only
                        when needed,
*/
class DelayedMessageData {
        typedef BObjectList<port_id, true> PortList;
        typedef void(*FailureCallback)(int32 code, port_id port, void* data);

public:
                                                                DelayedMessageData(int32 code, bigtime_t delay,
                                                                        bool isSpecificTime);
                                                                ~DelayedMessageData();

                        bool                            AddTarget(port_id port);
                        void                            RemoveTarget(port_id port);
                        int32                           CountTargets() const;

                        void                            MergeTargets(DelayedMessageData* other);

                        bool                            CopyData();
                        bool                            MergeData(DelayedMessageData* other);

                        bool                            IsValid() const;
                                // Only valid after a successful CopyData().

                        status_t                        Attach(const void* data, size_t size);

                        bool                            Compare(Attachment* one, Attachment* two,
                                                                        int32 index);

                        void                            SetMerge(DMMergeMode mode, uint32 mask);
                        void                            SendFailed(port_id port);

                        void                            SetFailureCallback(FailureCallback callback,
                                                                        void* data);

                        // Accessors.
                        int32&                          Code() {return fCode;}
                        const int32&            Code() const {return fCode;}

                        bigtime_t&                      ScheduledTime() {return fScheduledTime;}
                        const bigtime_t&        ScheduledTime() const {return fScheduledTime;}

                        AttachmentList&         Attachments() {return fAttachments;}
                        const AttachmentList&   Attachments() const {return fAttachments;}

                        PortList&                       Targets() {return fTargets;}
                        const PortList&         Targets() const {return fTargets;}

private:
                // Data members.

                        int32                           fCode;
                        bigtime_t                       fScheduledTime;
                        bool                            fValid;

                        AttachmentList          fAttachments;
                        PortList                        fTargets;

                        DMMergeMode                     fMergeMode;
                        uint32                          fMergeMask;

                        FailureCallback         fFailureCallback;
                        void*                           fFailureData;
};


// #pragma mark -



DelayedMessage::DelayedMessage(int32 code, bigtime_t delay,
                bool isSpecificTime)
        :
        fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY
                ? DM_MINIMUM_DELAY : delay, isSpecificTime)),
        fHandedOff(false)
{
}


DelayedMessage::~DelayedMessage()
{
        // Message is canceled without a handoff.
        if (!fHandedOff)
                delete fData;
}


bool
DelayedMessage::AddTarget(port_id port)
{
        if (fData == NULL || fHandedOff)
                return false;

        return fData->AddTarget(port);
}


void
DelayedMessage::SetMerge(DMMergeMode mode, uint32 match)
{
        if (fData == NULL || fHandedOff)
                return;

        fData->SetMerge(mode, match);
}


void
DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*),
        void* data)
{
        if (fData == NULL || fHandedOff)
                return;

        fData->SetFailureCallback(callback, data);
}


//! Attach data to message. Memory is not allocated nor copied until handoff.
status_t
DelayedMessage::Attach(const void* data, size_t size)
{
        if (fData == NULL)
                return B_NO_MEMORY;

        if (fHandedOff)
                return B_ERROR;

        if (data == NULL || size == 0)
                return B_BAD_VALUE;

        return  fData->Attach(data, size);
}


status_t
DelayedMessage::Flush()
{
        if (fData == NULL)
                return B_NO_MEMORY;

        if (fHandedOff)
                return B_ERROR;

        if (fData->CountTargets() == 0)
                return B_BAD_VALUE;

        return gDelayedMessageSender.ScheduleMessage(*this);
}


/*!     The data handoff occurs upon scheduling and reduces copies to only
        when a message is actually scheduled. Canceled messages have low cost.
*/
DelayedMessageData*
DelayedMessage::HandOff()
{
        if (fData == NULL || fHandedOff)
                return NULL;

        if (fData->CopyData()) {
                fHandedOff = true;
                return fData;
        }

        return NULL;
}


// #pragma mark -


Attachment::Attachment(const void* _data, size_t _size)
        :
        constData(_data),
        data(NULL),
        size(_size)
{
}


Attachment::~Attachment()
{
        free(data);
}


// #pragma mark -


DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay,
        bool isSpecificTime)
        :
        fCode(code),
        fScheduledTime(delay + (isSpecificTime ? 0 : system_time())),
        fValid(false),

        fAttachments(3),
        fTargets(4),

        fMergeMode(DM_NO_MERGE),
        fMergeMask(DM_DATA_DEFAULT),

        fFailureCallback(NULL),
        fFailureData(NULL)
{
}


DelayedMessageData::~DelayedMessageData()
{
}


bool
DelayedMessageData::AddTarget(port_id port)
{
        if (port <= 0)
                return false;

        // check for duplicates:
        for (int32 index = 0; index < fTargets.CountItems(); ++index) {
                if (port == *fTargets.ItemAt(index))
                        return false;
        }

        return fTargets.AddItem(new(std::nothrow) port_id(port));
}


void
DelayedMessageData::RemoveTarget(port_id port)
{
        if (port == B_BAD_PORT_ID)
                return;

        // Search for a match by value.
        for (int32 index = 0; index < fTargets.CountItems(); ++index) {
                port_id* target = fTargets.ItemAt(index);
                if (port == *target) {
                        fTargets.RemoveItem(target, true);
                        return;
                }
        }
}


int32
DelayedMessageData::CountTargets() const
{
        return fTargets.CountItems();
}


void
DelayedMessageData::MergeTargets(DelayedMessageData* other)
{
        // Failure to add one target does not abort the loop!
        // It could just mean we already have the target.
        for (int32 index = 0; index < other->fTargets.CountItems(); ++index)
                AddTarget(*(other->fTargets.ItemAt(index)));
}


//! Copy data from original location - merging failed
bool
DelayedMessageData::CopyData()
{
        Attachment* attached = NULL;

        for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
                attached = fAttachments.ItemAt(index);

                if (attached == NULL || attached->data != NULL)
                        return false;

                attached->data = malloc(attached->size);
                if (attached->data == NULL)
                        return false;

                memcpy(attached->data, attached->constData, attached->size);
        }

        fValid = true;
        return true;
}


bool
DelayedMessageData::MergeData(DelayedMessageData* other)
{
        if (!fValid
                || other == NULL
                || other->fCode != fCode
                || fMergeMode == DM_NO_MERGE
                || other->fMergeMode == DM_NO_MERGE
                || other->fMergeMode != fMergeMode
                || other->fAttachments.CountItems() != fAttachments.CountItems())
                return false;

        if (other->fMergeMode == DM_MERGE_CANCEL) {
                MergeTargets(other);
                return true;
        }

        // Compare data
        Attachment* attached = NULL;
        Attachment* otherAttached = NULL;

        for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
                attached = fAttachments.ItemAt(index);
                otherAttached = other->fAttachments.ItemAt(index);

                if (attached == NULL
                        || otherAttached == NULL
                        || attached->data == NULL
                        || otherAttached->constData == NULL
                        || attached->size != otherAttached->size)
                        return false;

                // Compares depending upon mode & flags
                if (!Compare(attached, otherAttached, index))
                        return false;
        }

        // add any targets not included in the existing message!
        MergeTargets(other);

        // since these are duplicates, we need not copy anything...
        if (fMergeMode == DM_MERGE_DUPLICATES)
                return true;

        // DM_MERGE_REPLACE:

        // Import the new data!
        for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
                attached = fAttachments.ItemAt(index);
                otherAttached = other->fAttachments.ItemAt(index);

                // We already have allocated our memory, but the other data
                // has not.  So this reduces memory allocations.
                memcpy(attached->data, otherAttached->constData, attached->size);
        }

        return true;
}


bool
DelayedMessageData::IsValid() const
{
        return fValid;
}


status_t
DelayedMessageData::Attach(const void* data, size_t size)
{
        // Sanity checking already performed
        Attachment* attach = new(std::nothrow) Attachment(data, size);

        if (attach == NULL)
                return B_NO_MEMORY;

        if (fAttachments.AddItem(attach) == false) {
                delete attach;
                return B_ERROR;
        }

        return B_OK;
}


bool
DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index)
{
        if (fMergeMode == DM_MERGE_DUPLICATES) {

                // Default-policy: all data must match
                if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0)
                        return memcmp(one->data, two->constData, one->size) == 0;

        } else if (fMergeMode == DM_MERGE_REPLACE) {

                // Default Policy: no data needs to match
                if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0)
                        return memcmp(one->data, two->constData, one->size) == 0;
        }

        return true;
}


void
DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask)
{
        fMergeMode = mode;
        fMergeMask = mask;
}


void
DelayedMessageData::SendFailed(port_id port)
{
        if (fFailureCallback != NULL)
                fFailureCallback(fCode, port, fFailureData);
}


void
DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data)
{
        fFailureCallback = callback;
        fFailureData = data;
}


// #pragma mark -


ScheduledMessage::ScheduledMessage(DelayedMessage& message)
        :
        fData(message.HandOff())
{
}


ScheduledMessage::~ScheduledMessage()
{
        delete fData;
}


int32
ScheduledMessage::CountTargets() const
{
        if (fData == NULL)
                return 0;

        return fData->CountTargets();
}


bigtime_t
ScheduledMessage::ScheduledTime() const
{
        if (fData == NULL)
                return 0;

        return fData->ScheduledTime();
}


//! Send our message and data to their intended target(s)
int32
ScheduledMessage::SendMessage()
{
        if (fData == NULL || !fData->IsValid())
                return 0;

        int32 sent = 0;
        for (int32 index = 0; index < fData->Targets().CountItems(); ++index) {
                port_id port = *(fData->Targets().ItemAt(index));
                status_t error = SendMessageToPort(port);

                if (error == B_OK) {
                        ++sent;
                        continue;
                }

                if (error != B_TIMED_OUT)
                        fData->SendFailed(port);
        }

        return sent;
}


status_t
ScheduledMessage::SendMessageToPort(port_id port)
{
        if (fData == NULL || !fData->IsValid())
                return B_BAD_DATA;

        if (port == B_BAD_PORT_ID)
                return B_BAD_VALUE;

        BPrivate::LinkSender sender(port);
        if (sender.StartMessage(fData->Code()) != B_OK)
                return B_ERROR;

        AttachmentList& list = fData->Attachments();
        Attachment* attached = NULL;
        status_t error = B_OK;

        // The data has been checked already, so we assume it is all good
        for (int32 index = 0; index < list.CountItems(); ++index) {
                attached = list.ItemAt(index);

                error = sender.Attach(attached->data, attached->size);
                if (error != B_OK) {
                        sender.CancelMessage();
                        return error;
                }
        }

        // We do not want to ever hold up the sender thread for too long, we
        // set a 1 second sending delay, which should be more than enough for
        // 99.992% of all cases.  Approximately.
        error = sender.Flush(1000000);

        if (error == B_OK || error == B_BAD_PORT_ID)
                fData->RemoveTarget(port);

        return error;
}


bool
ScheduledMessage::IsValid() const
{
        return fData != NULL && fData->IsValid();
}


bool
ScheduledMessage::Merge(DelayedMessage& other)
{
        if (!IsValid())
                return false;

        return fData->MergeData(other.Data());
}


bool
ScheduledMessage::operator<(const ScheduledMessage& other) const
{
        if (!IsValid() || !other.IsValid())
                return false;

        return fData->ScheduledTime() < other.fData->ScheduledTime();
}


int
CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two)
{
        return *one < *two;
}


// #pragma mark -


DelayedMessageSender::DelayedMessageSender()
        :
        fLock("DelayedMessageSender"),
        fMessages(20),
        fScheduledWakeup(B_INFINITE_TIMEOUT),
        fWakeupRetry(0),
        fThread(spawn_thread(&_thread_func, kName, kPriority, this)),
        fPort(create_port(kPortCapacity, "DelayedMessageSender")),
        fSentCount(0)
{
        resume_thread(fThread);
}


DelayedMessageSender::~DelayedMessageSender()
{
        // write the exit message to our port
        write_port(fPort, kExitMessage, NULL, 0);

        status_t status = B_OK;
        while (wait_for_thread(fThread, &status) == B_OK);

        // We now know the thread has exited, it is safe to cleanup
        delete_port(fPort);
}


status_t
DelayedMessageSender::ScheduleMessage(DelayedMessage& message)
{
        BAutolock _(fLock);

        // Can we merge with a pending message?
        ScheduledMessage* pending = NULL;
        for (int32 index = 0; index < fMessages.CountItems(); ++index) {
                pending = fMessages.ItemAt(index);
                if (pending->Merge(message))
                        return B_OK;
        }

        // Guess not, add it to our list!
        ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message);

        if (scheduled == NULL)
                return B_NO_MEMORY;

        if (!scheduled->IsValid()) {
                delete scheduled;
                return B_BAD_DATA;
        }

        if (fMessages.AddItem(scheduled)) {
                fMessages.SortItems(&CompareMessages);
                _Wakeup(scheduled->ScheduledTime());
                return B_OK;
        }

        return B_ERROR;
}


int32
DelayedMessageSender::CountDelayedMessages() const
{
        BAutolock _(fLock);
        return fMessages.CountItems();
}


int64
DelayedMessageSender::CountSentMessages() const
{
        return atomic_get64(&fSentCount);
}


void
DelayedMessageSender::_MessageLoop()
{
        int32 code = -1;
        status_t status = B_TIMED_OUT;
        bigtime_t timeout = B_INFINITE_TIMEOUT;

        while (true) {
                timeout = atomic_get64(&fScheduledWakeup);
                if (timeout != B_INFINITE_TIMEOUT)
                        timeout -= (system_time() + (DM_MINIMUM_DELAY / 2));

                if (timeout > DM_MINIMUM_DELAY / 4) {
                        status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT,
                                timeout);
                } else
                        status = B_TIMED_OUT;

                if (status == B_INTERRUPTED)
                        continue;

                if (status == B_TIMED_OUT) {
                        _SendDelayedMessages();
                        continue;
                }

                if (status == B_OK) {
                        switch (code) {
                                case kWakeupMessage:
                                        continue;

                                case kExitMessage:
                                        return;

                                // TODO: trace unhandled messages
                                default:
                                        continue;
                        }
                }

                // port deleted?
                if (status < B_OK)
                        break;
        }
}


int32
DelayedMessageSender::_thread_func(void* sender)
{
        (static_cast<DelayedMessageSender*>(sender))->_MessageLoop();
        return 0;
}


//! Sends pending messages, call ONLY from sender thread!
int32
DelayedMessageSender::_SendDelayedMessages()
{
        // avoid sending messages during times of contention
        if (fLock.LockWithTimeout(30000) != B_OK) {
                atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY);
                return 0;
        }

        atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT);

        if (fMessages.CountItems() == 0) {
                fLock.Unlock();
                return 0;
        }

        int32 sent = 0;

        bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2;
                // capture any that may be on the verge of being sent.

        BObjectList<ScheduledMessage> remove;

        ScheduledMessage* message = NULL;
        for (int32 index = 0; index < fMessages.CountItems(); ++index) {
                message = fMessages.ItemAt(index);

                if (message->ScheduledTime() > time) {
                        atomic_set64(&fScheduledWakeup, message->ScheduledTime());
                        break;
                }

                int32 sendCount = message->SendMessage();
                if (sendCount > 0)
                        sent += sendCount;

                if (message->CountTargets() == 0)
                        remove.AddItem(message);
        }

        // remove serviced messages
        for (int32 index = 0; index < remove.CountItems(); ++index)
                fMessages.RemoveItem(remove.ItemAt(index));

        atomic_add64(&fSentCount, sent);

        // catch any partly-failed messages (possibly late):
        if (fMessages.CountItems() > 0
                && atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) {

                fMessages.SortItems(&CompareMessages);
                message = fMessages.ItemAt(0);
                bigtime_t timeout = message->ScheduledTime() - time;

                if (timeout < 0)
                        timeout = DM_MINIMUM_DELAY;

                atomic_set64(&fScheduledWakeup, timeout);
        }

        fLock.Unlock();
        return sent;
}


void
DelayedMessageSender::_Wakeup(bigtime_t when)
{
        if (atomic_get64(&fScheduledWakeup) < when
                && atomic_get(&fWakeupRetry) == 0)
                return;

        atomic_set64(&fScheduledWakeup, when);

        BPrivate::LinkSender sender(fPort);
        sender.StartMessage(kWakeupMessage);
        status_t error = sender.Flush(30000);
        atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT);
}