root/src/add-ons/kernel/file_systems/netfs/shared/RequestConnection.cpp
// RequestConnection.cpp

#include <new>

#include <OS.h>

#include "Channel.h"
#include "Connection.h"
#include "RequestChannel.h"
#include "RequestConnection.h"
#include "RequestHandler.h"

// DownStreamThread
class RequestConnection::DownStreamThread {
public:
        DownStreamThread()
                : fThread(-1),
                  fConnection(NULL),
                  fChannel(NULL),
                  fHandler(NULL),
                  fTerminating(false)
        {
        }

        ~DownStreamThread()
        {
                Terminate();
                delete fChannel;
        }

        status_t Init(RequestConnection* connection, Channel* channel,
                RequestHandler* handler)
        {
                if (!connection || !channel || !handler)
                        return B_BAD_VALUE;
                fConnection = connection;
                fHandler = handler;
                fChannel = new(std::nothrow) RequestChannel(channel);
                if (!fChannel)
                        return B_NO_MEMORY;
                fThread = spawn_thread(_LoopEntry, "down stream thread",
                        B_NORMAL_PRIORITY, this);
                if (fThread < 0)
                        return fThread;
                return B_OK;
        }

        void Run()
        {
                resume_thread(fThread);
        }

        void Terminate()
        {
                fTerminating = true;
                if (fThread > 0 && find_thread(NULL) != fThread) {
                        int32 result;
                        wait_for_thread(fThread, &result);
                }
        }

        RequestChannel* GetRequestChannel()
        {
                return fChannel;
        }

private:
        static int32 _LoopEntry(void* data)
        {
                return ((DownStreamThread*)data)->_Loop();
        }

        int32 _Loop()
        {
                while (!fTerminating) {
                        Request* request;
                        status_t error = fChannel->ReceiveRequest(&request);
                        if (error == B_OK) {
                                error = fHandler->HandleRequest(request, fChannel);
                                delete request;
                        }
                        if (error != B_OK)
                                fTerminating = fConnection->DownStreamChannelError(this, error);
                }
                return 0;
        }

private:
        thread_id                       fThread;
        RequestConnection*      fConnection;
        RequestChannel*         fChannel;
        RequestHandler*         fHandler;
        volatile bool           fTerminating;
};


// RequestConnection

// constructor
RequestConnection::RequestConnection(Connection* connection,
        RequestHandler* requestHandler, bool ownsRequestHandler)
        : fConnection(connection),
          fRequestHandler(requestHandler),
          fOwnsRequestHandler(ownsRequestHandler),
          fThreads(NULL),
          fThreadCount(0),
          fTerminationCount(0)
{
}

// destructor
RequestConnection::~RequestConnection()
{
        Close();
        delete[] fThreads;
        delete fConnection;
        if (fOwnsRequestHandler)
                delete fRequestHandler;
}

// Init
status_t
RequestConnection::Init()
{
        // check parameters
        if (!fConnection || !fRequestHandler)
                return B_BAD_VALUE;
        if (fConnection->CountDownStreamChannels() < 1)
                return B_ERROR;
        // create a thread per down-stream channel
        fThreadCount = fConnection->CountDownStreamChannels();
        fThreads = new(std::nothrow) DownStreamThread[fThreadCount];
        if (!fThreads)
                return B_NO_MEMORY;
        // initialize the threads
        for (int32 i = 0; i < fThreadCount; i++) {
                status_t error = fThreads[i].Init(this,
                        fConnection->DownStreamChannelAt(i), fRequestHandler);
                if (error != B_OK)
                        return error;
        }
        // run the threads
        for (int32 i = 0; i < fThreadCount; i++)
                fThreads[i].Run();
        return B_OK;
}

// Close
void
RequestConnection::Close()
{
        atomic_add(&fTerminationCount, 1);
        if (fConnection)
                fConnection->Close();
        if (fThreads) {
                for (int32 i = 0; i < fThreadCount; i++)
                        fThreads[i].Terminate();
        }
}

// SendRequest
status_t
RequestConnection::SendRequest(Request* request, Request** reply)
{
        return _SendRequest(request, reply, NULL);
}

// SendRequest
status_t
RequestConnection::SendRequest(Request* request, RequestHandler* replyHandler)
{
        if (!replyHandler)
                return B_BAD_VALUE;
        return _SendRequest(request, NULL, replyHandler);
}

// DownStreamChannelError
bool
RequestConnection::DownStreamChannelError(DownStreamThread* thread,
        status_t error)
{
        if (atomic_add(&fTerminationCount, 1) == 0 && fRequestHandler) {
                ConnectionBrokenRequest request;
                request.error = error;
                fRequestHandler->HandleRequest(&request, thread->GetRequestChannel());
        }
        return true;
}

// _SendRequest
status_t
RequestConnection::_SendRequest(Request* request, Request** _reply,
        RequestHandler* replyHandler)
{
        // check parameters
        if (!request)
                return B_BAD_VALUE;
        // get a channel
        Channel* channel = NULL;
        status_t error = fConnection->GetUpStreamChannel(&channel);
        if (error != B_OK)
                return error;
        // send the request
        RequestChannel requestChannel(channel);
        error = requestChannel.SendRequest(request);
        // receive the reply
        Request* reply = NULL;
        if (error == B_OK && (_reply || replyHandler)) {
                error = requestChannel.ReceiveRequest(&reply);
                // handle the reply
                if (error == B_OK) {
                        if (replyHandler)
                                error = replyHandler->HandleRequest(reply, &requestChannel);
                        if (error == B_OK && _reply)
                                *_reply = reply;
                        else
                                delete reply;
                }
        }
        // cleanup
        if (fConnection->PutUpStreamChannel(channel) != B_OK) {
                // Ugh! A serious error. Probably insufficient memory.
                delete channel;
        }
        return error;
}