root/src/add-ons/kernel/network/protocols/unix/UnixFifo.cpp
/*
 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
 * Distributed under the terms of the MIT License.
 */

#include "UnixFifo.h"

#include <new>

#include <AutoDeleter.h>

#include <net_stack.h>
#include <util/ring_buffer.h>

#include "unix.h"


#define UNIX_FIFO_DEBUG_LEVEL   0
#define UNIX_DEBUG_LEVEL                UNIX_FIFO_DEBUG_LEVEL
#include "UnixDebug.h"


// #pragma mark - UnixRequest


UnixRequest::UnixRequest(const iovec* vecs, size_t count,
                ancillary_data_container* ancillaryData,
                struct sockaddr_storage* address, bool clone)
        :
        fVecs(vecs),
        fVecCount(count),
        fAncillaryData(ancillaryData),
        fTotalSize(0),
        fBytesTransferred(0),
        fVecIndex(0),
        fVecOffset(0),
        fAddress(address),
        fClone(clone)
{
        for (size_t i = 0; i < fVecCount; i++)
                fTotalSize += fVecs[i].iov_len;
}


void
UnixRequest::AddBytesTransferred(size_t size)
{
        fBytesTransferred += size;

        // also adjust the current iovec index/offset
        while (fVecIndex < fVecCount
                        && fVecs[fVecIndex].iov_len - fVecOffset <= size) {
                size -= fVecs[fVecIndex].iov_len - fVecOffset;
                fVecIndex++;
                fVecOffset = 0;
        }

        if (fVecIndex < fVecCount)
                fVecOffset += size;
}


bool
UnixRequest::GetCurrentChunk(void*& data, size_t& size)
{
        while (fVecIndex < fVecCount
                        && fVecOffset >= fVecs[fVecIndex].iov_len) {
                fVecIndex++;
                fVecOffset = 0;
        }
        if (fVecIndex >= fVecCount)
                return false;

        data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
        size = fVecs[fVecIndex].iov_len - fVecOffset;
        return true;
}


void
UnixRequest::UnsetAncillaryData()
{
        fAncillaryData = NULL;
}


void
UnixRequest::AddAncillaryData(ancillary_data_container* data)
{
        if (fAncillaryData != NULL) {
                gStackModule->move_ancillary_data(data, fAncillaryData);
                gStackModule->delete_ancillary_data_container(data);
        } else
                fAncillaryData = data;
}


status_t
UnixRequest::CloneAncillaryData(ancillary_data_container* data)
{
        if (fAncillaryData == NULL) {
                fAncillaryData = gStackModule->create_ancillary_data_container();
                if (fAncillaryData == NULL)
                        return B_NO_MEMORY;
        }

        return gStackModule->clone_ancillary_data(data, fAncillaryData);
}


// #pragma mark - UnixBufferQueue


UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type)
        :
        fBuffer(NULL),
        fCapacity(capacity),
        fType(type)
{
}


UnixBufferQueue::~UnixBufferQueue()
{
        while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
                gStackModule->delete_ancillary_data_container(entry->data);
                delete entry;
        }

        delete_ring_buffer(fBuffer);
}


status_t
UnixBufferQueue::Init()
{
        fBuffer = create_ring_buffer(fCapacity);
        if (fBuffer == NULL)
                return B_NO_MEMORY;
        return B_OK;
}


size_t
UnixBufferQueue::Readable() const
{
        return ring_buffer_readable(fBuffer);
}


size_t
UnixBufferQueue::Writable() const
{
        return ring_buffer_writable(fBuffer);
}


status_t
UnixBufferQueue::Read(UnixRequest& request)
{
        bool user = gStackModule->is_syscall();

        size_t readable = Readable();
        const bool clone = request.IsClone();
        void* data;
        size_t size;

        DatagramEntry* datagramEntry = NULL;
        if (fType == UnixFifoType::Datagram) {
                datagramEntry = fDatagrams.Head();
                if (datagramEntry == NULL)
                        return B_ERROR;

                if (datagramEntry->size > readable)
                        TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, "
                                "but only %lu bytes are readable\n", datagramEntry->size, readable);
                else
                        readable = datagramEntry->size;
        }

        while (readable > 0 && request.GetCurrentChunk(data, size)) {
                if (size > readable)
                        size = readable;

                ssize_t bytesRead;
                if (user) {
                        if (clone) {
                                bytesRead = ring_buffer_user_peek(fBuffer, request.BytesTransferred(),
                                        (uint8*)data, size);
                        } else {
                                bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
                        }
                } else {
                        if (clone) {
                                bytesRead = ring_buffer_peek(fBuffer, request.BytesTransferred(),
                                        (uint8*)data, size);
                        } else {
                                bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
                        }
                }
                if (bytesRead < 0)
                        return bytesRead;
                if (bytesRead == 0)
                        return B_ERROR;

                if (clone) {
                        // Clone ancillary data afterwards
                } else if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
                        // Adjust ancillary data entry offsets, respectively attach the ones
                        // that belong to the read data to the request.
                        size_t offsetDelta = bytesRead;
                        while (entry != NULL && offsetDelta > entry->offset) {
                                // entry data have been read -- add ancillary data to request
                                fAncillaryData.RemoveHead();
                                offsetDelta -= entry->offset;
                                request.AddAncillaryData(entry->data);
                                delete entry;

                                entry = fAncillaryData.Head();
                        }

                        if (entry != NULL)
                                entry->offset -= offsetDelta;
                }

                request.AddBytesTransferred(bytesRead);
                readable -= bytesRead;
        }

        if (!clone && fType == UnixFifoType::Datagram) {
                fDatagrams.RemoveHead();

                if (request.Address() != NULL)
                        memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address));
                delete datagramEntry;

                if (readable > 0) {
                        ring_buffer_flush(fBuffer, readable);
                        if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
                                size_t offsetDelta = readable;
                                while (entry != NULL && offsetDelta > entry->offset) {
                                        fAncillaryData.RemoveHead();
                                        offsetDelta -= entry->offset;
                                        gStackModule->delete_ancillary_data_container(entry->data);
                                        delete entry;

                                        entry = fAncillaryData.Head();
                                }

                                if (entry != NULL)
                                        entry->offset -= offsetDelta;
                        }
                        // for MSG_TRUNC on atomic sockets
                        request.AddBytesTransferred(readable);
                }
        }

        if (clone) {
                AncillaryDataEntry* entry = fAncillaryData.Head();
                size_t offsetDelta = request.BytesTransferred();
                while (entry != NULL && offsetDelta > entry->offset) {
                        request.CloneAncillaryData(entry->data);
                        entry = fAncillaryData.GetNext(entry);
                }
                if (fType == UnixFifoType::Datagram && request.Address() != NULL)
                        memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address));
        }


        return B_OK;
}


status_t
UnixBufferQueue::Write(UnixRequest& request)
{
        bool user = gStackModule->is_syscall();

        size_t writable = Writable();
        void* data;
        size_t size;

        DatagramEntry* datagramEntry = NULL;
        ObjectDeleter<DatagramEntry> datagramEntryDeleter;
        if (fType == UnixFifoType::Datagram) {
                datagramEntry = new(std::nothrow) DatagramEntry;
                if (datagramEntry == NULL)
                        return B_NO_MEMORY;

                datagramEntryDeleter.SetTo(datagramEntry);
                if (request.Address() != NULL)
                        memcpy(&datagramEntry->address, request.Address(), sizeof(datagramEntry->address));
                datagramEntry->size = request.TotalSize();

                // This should have been handled in UnixFifo
                if (writable < datagramEntry->size) {
                        TRACE("UnixBufferQueue::Write(): not enough space for"
                                "datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable);
                        return B_ERROR;
                }
        }

        // If the request has ancillary data create an entry first.
        AncillaryDataEntry* ancillaryEntry = NULL;
        ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
        if (writable > 0 && request.AncillaryData() != NULL) {
                ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
                if (ancillaryEntry == NULL)
                        return B_NO_MEMORY;

                ancillaryEntryDeleter.SetTo(ancillaryEntry);
                ancillaryEntry->data = request.AncillaryData();
                ancillaryEntry->offset = Readable();

                // The offsets are relative to the previous entry.
                AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
                while (AncillaryDataEntry* entry = it.Next())
                        ancillaryEntry->offset -= entry->offset;
                        // TODO: This is inefficient when the list is long. Rather also
                        // store and maintain the absolute offset of the last queued entry.
        }

        // write as much as we can
        while (writable > 0 && request.GetCurrentChunk(data, size)) {
                if (size > writable)
                        size = writable;

                ssize_t bytesWritten;
                if (user)
                        bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
                else
                        bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);

                if (bytesWritten < 0)
                        return bytesWritten;
                if (bytesWritten == 0)
                        return B_ERROR;

                if (ancillaryEntry != NULL) {
                        fAncillaryData.Add(ancillaryEntry);
                        ancillaryEntryDeleter.Detach();
                        request.UnsetAncillaryData();
                        ancillaryEntry = NULL;
                }

                request.AddBytesTransferred(bytesWritten);
                writable -= bytesWritten;
        }

        if (fType == UnixFifoType::Datagram) {
                fDatagrams.Add(datagramEntry);
                datagramEntryDeleter.Detach();
        }

        return B_OK;
}


status_t
UnixBufferQueue::SetCapacity(size_t capacity)
{
        if (capacity < ring_buffer_readable(fBuffer))
                return B_OK;

        ring_buffer* newBuffer = create_ring_buffer(capacity);
        if (newBuffer == NULL)
                return B_NO_MEMORY;

        ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer);
        delete_ring_buffer(fBuffer);

        fBuffer = newBuffer;
        fCapacity = capacity;

        return B_OK;
}


// #pragma mark -


UnixFifo::UnixFifo(size_t capacity, UnixFifoType type)
        :
        fBuffer(capacity, type),
        fReaders(),
        fWriters(),
        fReadRequested(0),
        fWriteRequested(0),
        fShutdown(0),
        fType(type)

{
        fReadCondition.Init(this, "unix fifo read");
        fWriteCondition.Init(this, "unix fifo write");
        mutex_init(&fLock, "unix fifo");
}


UnixFifo::~UnixFifo()
{
        mutex_destroy(&fLock);
}


status_t
UnixFifo::Init()
{
        return fBuffer.Init();
}


void
UnixFifo::Shutdown(uint32 shutdown)
{
        TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n",
                find_thread(NULL), this, shutdown);

        fShutdown |= shutdown;

        if (shutdown != 0) {
                // Shutting down either end also effects the other, so notify both.
                fReadCondition.NotifyAll();
                fWriteCondition.NotifyAll();
        }
}


ssize_t
UnixFifo::Read(const iovec* vecs, size_t vecCount,
        ancillary_data_container** _ancillaryData,
        struct sockaddr_storage* address, bigtime_t timeout, bool peek)
{
        TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ") %d\n",
                find_thread(NULL), this, vecs, vecCount, timeout, peek);

        if (IsReadShutdown() && fBuffer.Readable() == 0)
                RETURN_ERROR(UNIX_FIFO_SHUTDOWN);

        UnixRequest request(vecs, vecCount, NULL, address, peek);
        fReaders.Add(&request);
        fReadRequested += request.TotalSize();

        status_t error = _Read(request, timeout);

        bool firstInQueue = fReaders.Head() == &request;
        fReaders.Remove(&request);
        fReadRequested -= request.TotalSize();

        if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
                        && !IsReadShutdown()) {
                // There's more to read, other readers, and we were first in the queue.
                // So we need to notify the others.
                fReadCondition.NotifyAll();
        }

        if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
                        && !IsWriteShutdown()) {
                // We read something and there are writers. Notify them
                fWriteCondition.NotifyAll();
        }

        *_ancillaryData = request.AncillaryData();

        if (request.BytesTransferred() > 0) {
                if (request.BytesTransferred() > SSIZE_MAX)
                        RETURN_ERROR(SSIZE_MAX);
                RETURN_ERROR((ssize_t)request.BytesTransferred());
        }

        RETURN_ERROR(error);
}


ssize_t
UnixFifo::Write(const iovec* vecs, size_t vecCount,
        ancillary_data_container* ancillaryData,
        const struct sockaddr_storage* address, bigtime_t timeout)
{
        TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME
                ")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData,
                timeout);

        if (IsWriteShutdown())
                RETURN_ERROR(UNIX_FIFO_SHUTDOWN);

        if (IsReadShutdown())
                RETURN_ERROR(EPIPE);

        UnixRequest request(vecs, vecCount, ancillaryData,
                (struct sockaddr_storage*)address);
        fWriters.Add(&request);
        fWriteRequested += request.TotalSize();

        status_t error = _Write(request, timeout);

        bool firstInQueue = fWriters.Head() == &request;
        fWriters.Remove(&request);
        fWriteRequested -= request.TotalSize();

        if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
                        && !IsWriteShutdown()) {
                // There's more space for writing, other writers, and we were first in
                // the queue. So we need to notify the others.
                fWriteCondition.NotifyAll();
        }

        if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
                        && !IsReadShutdown()) {
                // We've written something and there are readers. Notify them.
                fReadCondition.NotifyAll();
        }

        if (request.BytesTransferred() > 0) {
                if (request.BytesTransferred() > SSIZE_MAX)
                        RETURN_ERROR(SSIZE_MAX);
                RETURN_ERROR((ssize_t)request.BytesTransferred());
        }

        RETURN_ERROR(error);
}


size_t
UnixFifo::Readable() const
{
        size_t readable = fBuffer.Readable();
        return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
}


size_t
UnixFifo::Writable() const
{
        size_t writable = fBuffer.Writable();
        return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
}


status_t
UnixFifo::SetBufferCapacity(size_t capacity)
{
        // check against allowed minimal/maximal value
        if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
                capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
        else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
                capacity = UNIX_FIFO_MINIMAL_CAPACITY;

        size_t oldCapacity = fBuffer.Capacity();
        if (capacity == oldCapacity)
                return B_OK;

        // set capacity
        status_t error = fBuffer.SetCapacity(capacity);
        if (error != B_OK)
                return error;

        // wake up waiting writers, if the capacity increased
        if (!fWriters.IsEmpty() && !IsWriteShutdown())
                fWriteCondition.NotifyAll();

        return B_OK;
}


status_t
UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
{
        // wait for the request to reach the front of the queue
        if (fReaders.Head() != &request && timeout == 0)
                RETURN_ERROR(B_WOULD_BLOCK);

        while (fReaders.Head() != &request
                && !(IsReadShutdown() && fBuffer.Readable() == 0)) {
                ConditionVariableEntry entry;
                fReadCondition.Add(&entry);

                mutex_unlock(&fLock);
                status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
                        timeout);
                mutex_lock(&fLock);

                if (error != B_OK)
                        RETURN_ERROR(error);
        }

        if (fBuffer.Readable() == 0) {
                if (IsReadShutdown())
                        RETURN_ERROR(UNIX_FIFO_SHUTDOWN);

                if (IsWriteShutdown())
                        RETURN_ERROR(0);

                if (timeout == 0)
                        RETURN_ERROR(B_WOULD_BLOCK);
        }

        // wait for any data to become available
// TODO: Support low water marks!
        while (fBuffer.Readable() == 0
                        && !IsReadShutdown() && !IsWriteShutdown()) {
                ConditionVariableEntry entry;
                fReadCondition.Add(&entry);

                mutex_unlock(&fLock);
                status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
                        timeout);
                mutex_lock(&fLock);

                if (error != B_OK)
                        RETURN_ERROR(error);
        }

        if (fBuffer.Readable() == 0) {
                if (IsReadShutdown())
                        RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
                if (IsWriteShutdown())
                        RETURN_ERROR(0);
        }

        RETURN_ERROR(fBuffer.Read(request));
}


status_t
UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
{
        if (timeout == 0)
                RETURN_ERROR(_WriteNonBlocking(request));

        // wait for the request to reach the front of the queue
        while (fWriters.Head() != &request && !IsWriteShutdown()) {
                ConditionVariableEntry entry;
                fWriteCondition.Add(&entry);

                mutex_unlock(&fLock);
                status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
                        timeout);
                mutex_lock(&fLock);

                if (error != B_OK)
                        RETURN_ERROR(error);
        }

        if (IsWriteShutdown())
                RETURN_ERROR(UNIX_FIFO_SHUTDOWN);

        if (IsReadShutdown())
                RETURN_ERROR(EPIPE);

        if (request.TotalSize() == 0)
                return 0;

        status_t error = B_OK;

        while (error == B_OK && request.BytesRemaining() > 0) {
                // wait for any space to become available
                while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request)
                                && !IsWriteShutdown() && !IsReadShutdown()) {
                        ConditionVariableEntry entry;
                        fWriteCondition.Add(&entry);

                        mutex_unlock(&fLock);
                        error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
                        mutex_lock(&fLock);

                        if (error != B_OK)
                                RETURN_ERROR(error);
                }

                if (IsWriteShutdown())
                        RETURN_ERROR(UNIX_FIFO_SHUTDOWN);

                if (IsReadShutdown())
                        RETURN_ERROR(EPIPE);

                // write as much as we can
                error = fBuffer.Write(request);

                if (error == B_OK) {
// TODO: Whenever we've successfully written a part, we should reset the
// timeout!
                }
        }

        RETURN_ERROR(error);
}


status_t
UnixFifo::_WriteNonBlocking(UnixRequest& request)
{
        // We need to be first in queue and space should be available right now,
        // otherwise we need to fail.
        if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request))
                RETURN_ERROR(B_WOULD_BLOCK);

        if (request.TotalSize() == 0)
                return 0;

        // Write as much as we can.
        RETURN_ERROR(fBuffer.Write(request));
}


size_t
UnixFifo::_MinimumWritableSize(const UnixRequest& request) const
{
        switch (fType) {
                case UnixFifoType::Datagram:
                        return request.TotalSize();
                case UnixFifoType::Stream:
                default:
                        return 1;
        }
}