#include <map>
#include <new>
#include <set>
#include <string.h>
#include <AutoDeleter.h>
#include <Autolock.h>
#include <MessagePrivate.h>
#include <MessengerPrivate.h>
#include <OS.h>
#include <StackOrHeapArray.h>
#include <TokenSpace.h>
#include <util/DoublyLinkedList.h>
#include <messaging.h>
#include "Debug.h"
#include "MessageDeliverer.h"
#include "Referenceable.h"
using std::map;
using std::nothrow;
using std::set;
MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
static const bigtime_t kRetryDelay = 100000;
static const int32 kMaxMessagesPerPort = 10000;
static const int32 kMaxDataPerPort = 50 * 1024 * 1024;
MessagingTargetSet::~MessagingTargetSet()
{
}
DefaultMessagingTargetSet::DefaultMessagingTargetSet(
const messaging_target *targets, int32 targetCount)
: MessagingTargetSet(),
fTargets(targets),
fTargetCount(targetCount),
fNextIndex(0)
{
}
DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
{
}
bool
DefaultMessagingTargetSet::HasNext() const
{
return (fNextIndex < fTargetCount);
}
bool
DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
{
if (fNextIndex >= fTargetCount)
return false;
port = fTargets[fNextIndex].port;
token = fTargets[fNextIndex].token;
fNextIndex++;
return true;
}
void
DefaultMessagingTargetSet::Rewind()
{
fNextIndex = 0;
}
SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
: MessagingTargetSet(),
fAtBeginning(true)
{
BMessenger::Private messengerPrivate(target);
fPort = messengerPrivate.Port();
fToken = (messengerPrivate.IsPreferredTarget()
? B_PREFERRED_TOKEN : messengerPrivate.Token());
}
SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
: MessagingTargetSet(),
fPort(port),
fToken(token),
fAtBeginning(true)
{
}
SingleMessagingTargetSet::~SingleMessagingTargetSet()
{
}
bool
SingleMessagingTargetSet::HasNext() const
{
return fAtBeginning;
}
bool
SingleMessagingTargetSet::Next(port_id &port, int32 &token)
{
if (!fAtBeginning)
return false;
port = fPort;
token = fToken;
fAtBeginning = false;
return true;
}
void
SingleMessagingTargetSet::Rewind()
{
fAtBeginning = true;
}
class MessageDeliverer::Message : public BReferenceable {
public:
Message(void *data, int32 dataSize, bigtime_t timeout)
: BReferenceable(),
fData(data),
fDataSize(dataSize),
fCreationTime(system_time()),
fBusy(false)
{
if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
fTimeoutTime = B_INFINITE_TIMEOUT;
else if (timeout <= 0)
fTimeoutTime = fCreationTime;
else
fTimeoutTime = fCreationTime + timeout;
}
~Message()
{
free(fData);
}
void *Data() const
{
return fData;
}
int32 DataSize() const
{
return fDataSize;
}
bigtime_t CreationTime() const
{
return fCreationTime;
}
bigtime_t TimeoutTime() const
{
return fTimeoutTime;
}
bool HasTimeout() const
{
return (fTimeoutTime < B_INFINITE_TIMEOUT);
}
void SetBusy(bool busy)
{
fBusy = busy;
}
bool IsBusy() const
{
return fBusy;
}
private:
void *fData;
int32 fDataSize;
bigtime_t fCreationTime;
bigtime_t fTimeoutTime;
bool fBusy;
};
class MessageDeliverer::TargetMessage
: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
public:
TargetMessage(Message *message, int32 token)
: fMessage(message),
fToken(token)
{
if (fMessage)
fMessage->AcquireReference();
}
~TargetMessage()
{
if (fMessage)
fMessage->ReleaseReference();
}
Message *GetMessage() const
{
return fMessage;
}
int32 Token() const
{
return fToken;
}
private:
Message *fMessage;
int32 fToken;
};
class MessageDeliverer::TargetMessageHandle {
public:
TargetMessageHandle(TargetMessage *message)
: fMessage(message)
{
}
TargetMessageHandle(const TargetMessageHandle &other)
: fMessage(other.fMessage)
{
}
TargetMessage *GetMessage() const
{
return fMessage;
}
TargetMessageHandle &operator=(const TargetMessageHandle &other)
{
fMessage = other.fMessage;
return *this;
}
bool operator==(const TargetMessageHandle &other) const
{
return (fMessage == other.fMessage);
}
bool operator!=(const TargetMessageHandle &other) const
{
return (fMessage != other.fMessage);
}
bool operator<(const TargetMessageHandle &other) const
{
bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
if (timeout < otherTimeout)
return true;
if (timeout > otherTimeout)
return false;
return (fMessage < other.fMessage);
}
private:
TargetMessage *fMessage;
};
class MessageDeliverer::TargetPort {
public:
TargetPort(port_id portID)
: fPortID(portID),
fMessages(),
fMessageCount(0),
fMessageSize(0)
{
}
~TargetPort()
{
while (!fMessages.IsEmpty())
PopMessage();
}
port_id PortID() const
{
return fPortID;
}
status_t PushMessage(Message *message, int32 token)
{
PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32 ", %p, %"
B_PRId32 ")\n", fPortID, message, token);
TargetMessage *targetMessage
= new(nothrow) TargetMessage(message, token);
if (!targetMessage)
return B_NO_MEMORY;
fMessages.Insert(targetMessage);
fMessageCount++;
fMessageSize += targetMessage->GetMessage()->DataSize();
if (message->HasTimeout())
fTimeoutableMessages.insert(targetMessage);
_EnforceLimits();
return B_OK;
}
Message *PeekMessage(int32 &token) const
{
if (!fMessages.Head())
return NULL;
token = fMessages.Head()->Token();
return fMessages.Head()->GetMessage();
}
void PopMessage()
{
if (fMessages.Head()) {
PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32 ", %p\n",
fPortID, fMessages.Head()->GetMessage());
_RemoveMessage(fMessages.Head());
}
}
void DropTimedOutMessages()
{
bigtime_t now = system_time();
while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
if (message->GetMessage()->TimeoutTime() > now)
break;
PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
": message %p timed out\n", fPortID, message->GetMessage());
_RemoveMessage(message);
}
}
bool IsEmpty() const
{
return fMessages.IsEmpty();
}
private:
void _RemoveMessage(TargetMessage *message)
{
fMessages.Remove(message);
fMessageCount--;
fMessageSize -= message->GetMessage()->DataSize();
if (message->GetMessage()->HasTimeout())
fTimeoutableMessages.erase(message);
delete message;
}
void _EnforceLimits()
{
while (fMessageCount > kMaxMessagesPerPort) {
PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
": hit maximum message count limit.\n", fPortID);
PopMessage();
}
while (fMessageSize > kMaxDataPerPort) {
PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
": hit maximum message size limit.\n", fPortID);
PopMessage();
}
}
typedef DoublyLinkedList<TargetMessage> MessageList;
port_id fPortID;
MessageList fMessages;
int32 fMessageCount;
int32 fMessageSize;
set<TargetMessageHandle> fTimeoutableMessages;
};
struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
};
MessageDeliverer::MessageDeliverer()
: fLock("message deliverer"),
fTargetPorts(NULL),
fDelivererThread(-1),
fTerminating(false)
{
}
MessageDeliverer::~MessageDeliverer()
{
fTerminating = true;
if (fDelivererThread >= 0) {
int32 result;
wait_for_thread(fDelivererThread, &result);
}
delete fTargetPorts;
}
status_t
MessageDeliverer::Init()
{
fTargetPorts = new(nothrow) TargetPortMap;
if (!fTargetPorts)
return B_NO_MEMORY;
fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
"message deliverer", B_NORMAL_PRIORITY + 1, this);
if (fDelivererThread < 0)
return fDelivererThread;
resume_thread(fDelivererThread);
return B_OK;
}
status_t
MessageDeliverer::CreateDefault()
{
if (sDeliverer)
return B_OK;
MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
if (!deliverer)
return B_NO_MEMORY;
status_t error = deliverer->Init();
if (error != B_OK) {
delete deliverer;
return error;
}
sDeliverer = deliverer;
return B_OK;
}
void
MessageDeliverer::DeleteDefault()
{
if (sDeliverer) {
delete sDeliverer;
sDeliverer = NULL;
}
}
MessageDeliverer *
MessageDeliverer::Default()
{
return sDeliverer;
}
status_t
MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
bigtime_t timeout)
{
SingleMessagingTargetSet set(target);
return DeliverMessage(message, set, timeout);
}
status_t
MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
bigtime_t timeout)
{
if (message == NULL)
return B_BAD_VALUE;
ssize_t size = message->FlattenedSize();
BStackOrHeapArray<char, 4096> buffer(size);
if (!buffer.IsValid())
return B_NO_MEMORY;
status_t error = message->Flatten(buffer, size);
if (error < B_OK)
return error;
return DeliverMessage(buffer, size, targets, timeout);
}
status_t
MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
MessagingTargetSet &targets, bigtime_t timeout)
{
if (!messageData || messageSize <= 0)
return B_BAD_VALUE;
BReference<Message> messageRef;
BAutolock locker(fLock);
for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
port_id portID;
int32 token;
targets.Next(portID, token);
TargetPort *port = _GetTargetPort(portID, true);
if (!port)
return B_NO_MEMORY;
if (port->IsEmpty()) {
status_t error = BMessage::Private::SendFlattenedMessage((void*)messageData,
messageSize, portID, token, 0);
if (error == B_OK) {
_PutTargetPort(port);
continue;
}
if (error != B_WOULD_BLOCK) {
_PutTargetPort(port);
if (targetIndex == 0 && !targets.HasNext())
return error;
continue;
}
}
if (!messageRef.IsSet()) {
void *data = malloc(messageSize);
if (!data)
return B_NO_MEMORY;
memcpy(data, messageData, messageSize);
Message *message = new(nothrow) Message(data, messageSize, timeout);
if (!message) {
free(data);
return B_NO_MEMORY;
}
messageRef.SetTo(message, true);
}
status_t error = port->PushMessage(messageRef, token);
_PutTargetPort(port);
if (error != B_OK)
return error;
}
return B_OK;
}
MessageDeliverer::TargetPort *
MessageDeliverer::_GetTargetPort(port_id portID, bool create)
{
TargetPortMap::iterator it = fTargetPorts->find(portID);
if (it != fTargetPorts->end())
return it->second;
if (!create)
return NULL;
TargetPort *port = new(nothrow) TargetPort(portID);
if (!port)
return NULL;
(*fTargetPorts)[portID] = port;
return port;
}
void
MessageDeliverer::_PutTargetPort(TargetPort *port)
{
if (!port)
return;
if (port->IsEmpty()) {
fTargetPorts->erase(port->PortID());
delete port;
}
}
status_t
MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
{
status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
message->DataSize(), portID, token, 0);
return error;
}
int32
MessageDeliverer::_DelivererThreadEntry(void *data)
{
return ((MessageDeliverer*)data)->_DelivererThread();
}
int32
MessageDeliverer::_DelivererThread()
{
while (!fTerminating) {
snooze(kRetryDelay);
if (fTerminating)
break;
BAutolock _(fLock);
for (TargetPortMap::iterator it = fTargetPorts->begin();
it != fTargetPorts->end();) {
TargetPort *port = it->second;
bool portError = false;
port->DropTimedOutMessages();
int32 token;
while (Message *message = port->PeekMessage(token)) {
status_t error = B_OK;
error = _SendMessage(message, port->PortID(), token);
if (error == B_OK) {
port->PopMessage();
} else if (error == B_WOULD_BLOCK) {
break;
} else {
portError = true;
break;
}
}
if (portError || port->IsEmpty()) {
TargetPortMap::iterator oldIt = it;
++it;
delete port;
fTargetPorts->erase(oldIt);
} else
++it;
}
}
return 0;
}