root/src/servers/app/drawing/interface/remote/StreamingRingBuffer.cpp
/*
 * Copyright 2009, 2017, Haiku, Inc.
 * Distributed under the terms of the MIT License.
 *
 * Authors:
 *              Michael Lotz <mmlr@mlotz.ch>
 */

#include "StreamingRingBuffer.h"

#include <Autolock.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>


#ifdef CLIENT_COMPILE
#define TRACE_ALWAYS(x...)              printf("StreamingRingBuffer: " x)
#else
#define TRACE_ALWAYS(x...)              debug_printf("StreamingRingBuffer: " x)
#endif

#define TRACE(x...)                             /*TRACE_ALWAYS(x)*/
#define TRACE_ERROR(x...)               TRACE_ALWAYS(x)


StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
        :
        fReaderWaiting(false),
        fWriterWaiting(false),
        fCancelRead(false),
        fCancelWrite(false),
        fReaderNotifier(-1),
        fWriterNotifier(-1),
        fReaderLocker("StreamingRingBuffer reader"),
        fWriterLocker("StreamingRingBuffer writer"),
        fDataLocker("StreamingRingBuffer data"),
        fBuffer(NULL),
        fBufferSize(bufferSize),
        fReadable(0),
        fReadPosition(0),
        fWritePosition(0)
{
        fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
        fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");

        fBuffer = (uint8 *)malloc(fBufferSize);
        if (fBuffer == NULL)
                fBufferSize = 0;
}


StreamingRingBuffer::~StreamingRingBuffer()
{
        delete_sem(fReaderNotifier);
        delete_sem(fWriterNotifier);
        free(fBuffer);
}


status_t
StreamingRingBuffer::InitCheck()
{
        if (fReaderNotifier < 0)
                return fReaderNotifier;
        if (fWriterNotifier < 0)
                return fWriterNotifier;
        if (fBuffer == NULL)
                return B_NO_MEMORY;

        return B_OK;
}


int32
StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
{
        BAutolock readerLock(fReaderLocker);
        if (!readerLock.IsLocked())
                return B_ERROR;

        BAutolock dataLock(fDataLocker);
        if (!dataLock.IsLocked())
                return B_ERROR;

        int32 readSize = 0;
        while (length > 0) {
                size_t copyLength = min_c(length, fBufferSize - fReadPosition);
                copyLength = min_c(copyLength, fReadable);

                if (copyLength == 0) {
                        if (onlyBlockOnNoData && readSize > 0)
                                return readSize;

                        fReaderWaiting = true;
                        dataLock.Unlock();

                        status_t result;
                        do {
                                TRACE("waiting in reader\n");
                                result = acquire_sem(fReaderNotifier);
                                TRACE("done waiting in reader with status: %#" B_PRIx32 "\n",
                                        result);
                        } while (result == B_INTERRUPTED);

                        if (result != B_OK)
                                return result;

                        if (!dataLock.Lock()) {
                                TRACE_ERROR("failed to acquire data lock\n");
                                return B_ERROR;
                        }

                        if (fCancelRead) {
                                TRACE("read canceled\n");
                                fCancelRead = false;
                                return B_CANCELED;
                        }

                        continue;
                }

                // support discarding input
                if (buffer != NULL) {
                        memcpy(buffer, fBuffer + fReadPosition, copyLength);
                        buffer = (uint8 *)buffer + copyLength;
                }

                fReadPosition = (fReadPosition + copyLength) % fBufferSize;
                fReadable -= copyLength;
                readSize += copyLength;
                length -= copyLength;

                if (fWriterWaiting) {
                        release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
                        fWriterWaiting = false;
                }
        }

        return readSize;
}


status_t
StreamingRingBuffer::Write(const void *buffer, size_t length)
{
        BAutolock writerLock(fWriterLocker);
        if (!writerLock.IsLocked())
                return B_ERROR;

        BAutolock dataLock(fDataLocker);
        if (!dataLock.IsLocked())
                return B_ERROR;

        while (length > 0) {
                size_t copyLength = min_c(length, fBufferSize - fWritePosition);
                copyLength = min_c(copyLength, fBufferSize - fReadable);

                if (copyLength == 0) {
                        fWriterWaiting = true;
                        dataLock.Unlock();

                        status_t result;
                        do {
                                TRACE("waiting in writer\n");
                                result = acquire_sem(fWriterNotifier);
                                TRACE("done waiting in writer with status: %#" B_PRIx32 "\n",
                                        result);
                        } while (result == B_INTERRUPTED);

                        if (result != B_OK)
                                return result;

                        if (!dataLock.Lock()) {
                                TRACE_ERROR("failed to acquire data lock\n");
                                return B_ERROR;
                        }

                        if (fCancelWrite) {
                                TRACE("write canceled\n");
                                fCancelWrite = false;
                                return B_CANCELED;
                        }

                        continue;
                }

                memcpy(fBuffer + fWritePosition, buffer, copyLength);
                fWritePosition = (fWritePosition + copyLength) % fBufferSize;
                fReadable += copyLength;

                buffer = (uint8 *)buffer + copyLength;
                length -= copyLength;

                if (fReaderWaiting) {
                        release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
                        fReaderWaiting = false;
                }
        }

        return B_OK;
}


void
StreamingRingBuffer::MakeEmpty()
{
        BAutolock dataLock(fDataLocker);
        if (!dataLock.IsLocked())
                return;

        fReadPosition = fWritePosition = 0;
        fReadable = 0;

        if (fWriterWaiting) {
                release_sem_etc(fWriterNotifier, 1, 0);
                fWriterWaiting = false;
                fCancelWrite = true;
        }

        if (fReaderWaiting) {
                release_sem_etc(fReaderNotifier, 1, 0);
                fReaderWaiting = false;
                fCancelRead = true;
        }
}