#include <stdio.h>
#include <OS.h>
#include "Packet.h"
#include "PacketQueue.h"
#define TRACE_PACKET_QUEUE
#ifdef TRACE_PACKET_QUEUE
#define TRACE printf
#else
#define TRACE(a...)
#endif
PacketQueue::PacketQueue(int max_packets)
: fQueue(new Packet* [max_packets])
, fSem(create_sem(0, "packet queue sem"))
, fTerminate(false)
, fWriteIndex(0)
, fReadIndex(0)
, fMaxPackets(max_packets)
, fPacketCount(0)
{
}
PacketQueue::~PacketQueue()
{
Empty();
delete_sem(fSem);
delete [] fQueue;
}
void
PacketQueue::Empty()
{
while (fPacketCount--) {
delete fQueue[fReadIndex];
fReadIndex = (fReadIndex + 1) % fMaxPackets;
}
}
status_t
PacketQueue::Insert(Packet *packet)
{
if (fTerminate) {
return B_NOT_ALLOWED;
}
if (atomic_add(&fPacketCount, 1) == fMaxPackets) {
atomic_add(&fPacketCount, -1);
return B_WOULD_BLOCK;
}
fQueue[fWriteIndex] = packet;
fWriteIndex = (fWriteIndex + 1) % fMaxPackets;
release_sem(fSem);
return B_OK;
}
status_t
PacketQueue::Remove(Packet **packet)
{
if (fTerminate) {
return B_NOT_ALLOWED;
}
if (acquire_sem(fSem) != B_OK)
return B_ERROR;
if (fTerminate) {
return B_NOT_ALLOWED;
}
*packet = fQueue[fReadIndex];
atomic_add(&fPacketCount, -1);
fReadIndex = (fReadIndex + 1) % fMaxPackets;
return B_OK;
}
void
PacketQueue::Flush(bigtime_t timeout)
{
if (fTerminate) {
return;
}
timeout += system_time();
while (acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT, timeout) == B_OK) {
if (fTerminate) {
return;
}
Packet *packet = fQueue[fReadIndex];
atomic_add(&fPacketCount, -1);
fReadIndex = (fReadIndex + 1) % fMaxPackets;
delete packet;
}
}
status_t
PacketQueue::Peek(Packet **packet)
{
if (fTerminate) {
return B_NOT_ALLOWED;
}
if (acquire_sem(fSem) != B_OK)
return B_ERROR;
if (fTerminate) {
return B_NOT_ALLOWED;
}
*packet = new Packet(*fQueue[fReadIndex]);
release_sem(fSem);
return B_OK;
}
void
PacketQueue::Terminate()
{
fTerminate = true;
release_sem(fSem);
}
void
PacketQueue::Restart()
{
Empty();
delete_sem(fSem);
fSem = create_sem(0, "packet queue sem");
fTerminate = false;
fWriteIndex = 0;
fReadIndex = 0;
fPacketCount = 0;
}