root/src/add-ons/kernel/network/protocols/unix/UnixStreamEndpoint.cpp
/*
 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
 * Distributed under the terms of the MIT License.
 */


#include "UnixStreamEndpoint.h"

#include <stdio.h>
#include <sys/stat.h>

#include <AutoDeleter.h>

#include <vfs.h>

#include "UnixAddressManager.h"
#include "UnixFifo.h"


#define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL        0
#define UNIX_DEBUG_LEVEL                                        UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
#include "UnixDebug.h"


// Note on locking order (outermost -> innermost):
// UnixStreamEndpoint: connecting -> listening -> child
// -> UnixFifo (never lock more than one at a time)
// -> UnixAddressManager


UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket, bool atomic)
        :
        UnixEndpoint(socket),
        fPeerEndpoint(NULL),
        fReceiveFifo(NULL),
        fState(unix_stream_endpoint_state::Closed),
        fAcceptSemaphore(-1),
        fIsChild(false),
        fWasConnected(false),
        fAtomic(atomic)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
                find_thread(NULL), this);
}


UnixStreamEndpoint::~UnixStreamEndpoint()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
                find_thread(NULL), this);
}


status_t
UnixStreamEndpoint::Init()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
                this);

        RETURN_ERROR(B_OK);
}


void
UnixStreamEndpoint::Uninit()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
                this);

        // check whether we're closed
        UnixStreamEndpointLocker locker(this);
        bool closed = (fState == unix_stream_endpoint_state::Closed);
        locker.Unlock();

        if (!closed) {
                // That probably means, we're a child endpoint of a listener and
                // have been fully connected, but not yet accepted. Our Close()
                // hook isn't called in this case. Do it manually.
                Close();
        }

        ReleaseReference();
}


status_t
UnixStreamEndpoint::Open()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
                this);

        status_t error = ProtocolSocket::Open();
        if (error != B_OK)
                RETURN_ERROR(error);

        fState = unix_stream_endpoint_state::NotConnected;

        RETURN_ERROR(B_OK);
}


status_t
UnixStreamEndpoint::Close()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
                this);

        UnixStreamEndpointLocker locker(this);

        if (fState == unix_stream_endpoint_state::Connected) {
                BReference<UnixStreamEndpoint> peerReference;
                UnixStreamEndpointLocker peerLocker;
                if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
                        // The peer may be destroyed when we release it,
                        // so grab a reference for the locker's sake.
                        peerReference.SetTo(peerLocker.Get(), false);

                        // We're still connected. Disconnect both endpoints!
                        fPeerEndpoint->_Disconnect();
                        _Disconnect();
                }
        }

        if (fState == unix_stream_endpoint_state::Listening)
                _StopListening();

        _Unbind();

        fState = unix_stream_endpoint_state::Closed;
        RETURN_ERROR(B_OK);
}


status_t
UnixStreamEndpoint::Free()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
                this);

        UnixStreamEndpointLocker locker(this);

        _UnsetReceiveFifo();

        RETURN_ERROR(B_OK);
}


status_t
UnixStreamEndpoint::Bind(const struct sockaddr* _address)
{
        if (_address->sa_family != AF_UNIX)
                RETURN_ERROR(EAFNOSUPPORT);

        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
                find_thread(NULL), this,
                ConstSocketAddress(&gAddressModule, _address).AsString().Data());

        const sockaddr_un* address = (const sockaddr_un*)_address;

        UnixStreamEndpointLocker endpointLocker(this);

        if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
                RETURN_ERROR(B_BAD_VALUE);

        RETURN_ERROR(_Bind(address));
}


status_t
UnixStreamEndpoint::Unbind()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
                this);

        UnixStreamEndpointLocker endpointLocker(this);

        RETURN_ERROR(_Unbind());
}


status_t
UnixStreamEndpoint::Listen(int backlog)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
                this, backlog);

        UnixStreamEndpointLocker endpointLocker(this);

        if (!IsBound())
                RETURN_ERROR(EDESTADDRREQ);
        if (fState != unix_stream_endpoint_state::NotConnected
                && fState != unix_stream_endpoint_state::Listening)
                RETURN_ERROR(EINVAL);

        gSocketModule->set_max_backlog(socket, backlog);

        if (fState == unix_stream_endpoint_state::NotConnected) {
                fAcceptSemaphore = create_sem(0, "unix accept");
                if (fAcceptSemaphore < 0)
                        RETURN_ERROR(ENOBUFS);

                _UnsetReceiveFifo();

                fCredentials.pid = getpid();
                fCredentials.uid = geteuid();
                fCredentials.gid = getegid();

                fState = unix_stream_endpoint_state::Listening;
        }

        RETURN_ERROR(B_OK);
}


status_t
UnixStreamEndpoint::Connect(const struct sockaddr* _address)
{
        if (_address->sa_family != AF_UNIX)
                RETURN_ERROR(EAFNOSUPPORT);

        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
                find_thread(NULL), this,
                ConstSocketAddress(&gAddressModule, _address).AsString().Data());

        const sockaddr_un* address = (const sockaddr_un*)_address;

        UnixStreamEndpointLocker endpointLocker(this);

        if (fState == unix_stream_endpoint_state::Connected)
                RETURN_ERROR(EISCONN);

        if (fState != unix_stream_endpoint_state::NotConnected)
                RETURN_ERROR(B_BAD_VALUE);
// TODO: If listening, we could set the backlog to 0 and connect.

        // check the address first
        UnixAddress unixAddress;

        if (address->sun_path[0] == '\0') {
                // internal address space (or empty address)
                int32 internalID;
                if (UnixAddress::IsEmptyAddress(*address))
                        RETURN_ERROR(B_BAD_VALUE);

                internalID = UnixAddress::InternalID(*address);
                if (internalID < 0)
                        RETURN_ERROR(internalID);

                unixAddress.SetTo(internalID);
        } else {
                // FS address space
                size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
                if (pathLen == 0 || pathLen == sizeof(address->sun_path))
                        RETURN_ERROR(B_BAD_VALUE);

                struct stat st;
                status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
                        !gStackModule->is_syscall());
                if (error != B_OK)
                        RETURN_ERROR(error);

                if (!S_ISSOCK(st.st_mode))
                        RETURN_ERROR(B_BAD_VALUE);

                unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
        }

        // get the peer endpoint
        UnixAddressManagerLocker addressLocker(gAddressManager);
        UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
        if (listeningUnixEndpoint == NULL)
                RETURN_ERROR(ECONNREFUSED);
        UnixStreamEndpoint* listeningEndpoint
                = dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
        if (listeningEndpoint == NULL)
                RETURN_ERROR(EPROTOTYPE);
        BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
        addressLocker.Unlock();

        UnixStreamEndpointLocker peerLocker(listeningEndpoint);

        if (!listeningEndpoint->IsBound()
                || listeningEndpoint->fState != unix_stream_endpoint_state::Listening
                || listeningEndpoint->fAddress != unixAddress) {
                RETURN_ERROR(ECONNREFUSED);
        }

        // Allocate FIFOs for us and the socket we're going to spawn. We do that
        // now, so that the mess we need to cleanup, if allocating them fails, is
        // harmless.
        UnixFifoType type = fAtomic ? UnixFifoType::Datagram : UnixFifoType::Stream;
        UnixFifo* fifo = new(nothrow) UnixFifo(socket->receive.buffer_size, type);
        UnixFifo* peerFifo = new(nothrow) UnixFifo(socket->send.buffer_size, type);
        ObjectDeleter<UnixFifo> fifoDeleter(fifo);
        ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);

        status_t error;
        if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
                return error;

        // spawn new endpoint for accept()
        net_socket* newSocket;
        error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
                &newSocket);
        if (error != B_OK)
                RETURN_ERROR(error);

        // init connected peer endpoint
        UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;

        UnixStreamEndpointLocker connectedLocker(connectedEndpoint);

        connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);

        // update our attributes
        _UnsetReceiveFifo();

        fPeerEndpoint = connectedEndpoint;
        PeerAddress().SetTo(&connectedEndpoint->socket->address);
        fPeerEndpoint->AcquireReference();
        fReceiveFifo = fifo;

        fCredentials.pid = getpid();
        fCredentials.uid = geteuid();
        fCredentials.gid = getegid();

        fifoDeleter.Detach();
        peerFifoDeleter.Detach();

        fState = unix_stream_endpoint_state::Connected;
        fWasConnected = true;

        gSocketModule->set_connected(Socket());

        release_sem(listeningEndpoint->fAcceptSemaphore);

        connectedLocker.Unlock();
        peerLocker.Unlock();
        endpointLocker.Unlock();

        RETURN_ERROR(B_OK);
}


status_t
UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
                this);

        bigtime_t timeout = absolute_timeout(socket->receive.timeout);
        if (gStackModule->is_restarted_syscall())
                timeout = gStackModule->restore_syscall_restart_timeout();
        else
                gStackModule->store_syscall_restart_timeout(timeout);

        UnixStreamEndpointLocker locker(this);

        status_t error;
        do {
                locker.Unlock();

                error = acquire_sem_etc(fAcceptSemaphore, 1,
                        B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
                if (error < B_OK)
                        break;

                locker.Lock();
                error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
        } while (error != B_OK);

        if (error == B_TIMED_OUT && timeout == 0) {
                // translate non-blocking timeouts to the correct error code
                error = B_WOULD_BLOCK;
        }

        RETURN_ERROR(error);
}


ssize_t
UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
        ancillary_data_container* ancillaryData,
        const struct sockaddr* address, socklen_t addressLength, int flags)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
                find_thread(NULL), this, vecs, vecCount, ancillaryData);

        // TODO: handle MSG_EOR
        bool atomic = (flags & MSG_EOR) != 0;
        if ((atomic && !fAtomic) || (flags & ~(MSG_DONTWAIT | MSG_EOR)) != 0)
                return EOPNOTSUPP;

        bigtime_t timeout = 0;
        if ((flags & MSG_DONTWAIT) == 0) {
                timeout = absolute_timeout(socket->send.timeout);
                if (gStackModule->is_restarted_syscall())
                        timeout = gStackModule->restore_syscall_restart_timeout();
                else
                        gStackModule->store_syscall_restart_timeout(timeout);
        }

        UnixStreamEndpointLocker locker(this);

        BReference<UnixStreamEndpoint> peerReference;
        UnixStreamEndpointLocker peerLocker;

        status_t error = _LockConnectedEndpoints(locker, peerLocker);
        if (error != B_OK)
                RETURN_ERROR(error);

        UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
        peerReference.SetTo(peerEndpoint);

        // lock the peer's FIFO
        UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
        BReference<UnixFifo> _(peerFifo);
        UnixFifoLocker fifoLocker(peerFifo);

        // unlock endpoints
        locker.Unlock();
        peerLocker.Unlock();

        ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);

        // Notify select()ing readers, if we successfully wrote anything.
        size_t readable = peerFifo->Readable();
        bool notifyRead = (error == B_OK && readable > 0
                && !peerFifo->IsReadShutdown());

        // Notify select()ing writers, if we failed to write anything and there's
        // still room to write.
        size_t writable = peerFifo->Writable();
        bool notifyWrite = (error != B_OK && writable > 0
                && !peerFifo->IsWriteShutdown());

        // re-lock our endpoint (unlock FIFO to respect locking order)
        fifoLocker.Unlock();
        locker.Lock();

        bool peerLocked = (fPeerEndpoint == peerEndpoint
                && _LockConnectedEndpoints(locker, peerLocker) == B_OK);

        // send notifications
        if (peerLocked && notifyRead)
                gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
        if (notifyWrite)
                gSocketModule->notify(socket, B_SELECT_WRITE, writable);

        switch (result) {
                case UNIX_FIFO_SHUTDOWN:
                        if (fPeerEndpoint == peerEndpoint
                                        && fState == unix_stream_endpoint_state::Connected) {
                                // Orderly write shutdown on our side.
                                // Note: Linux and Solaris also send a SIGPIPE, but according
                                // the send() specification that shouldn't be done.
                                result = EPIPE;
                        } else {
                                // The FD has been closed.
                                result = EBADF;
                        }
                        break;
                case EPIPE:
                        // The socket module will generate SIGPIPE for us, if necessary.
                        break;
                case B_TIMED_OUT:
                        // Translate non-blocking timeouts to the correct error code.
                        if (timeout == 0)
                                result = B_WOULD_BLOCK;
                        break;
        }

        RETURN_ERROR(result);
}


ssize_t
UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
        ancillary_data_container** _ancillaryData, struct sockaddr* _address,
        socklen_t* _addressLength, int flags)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
                find_thread(NULL), this, vecs, vecCount);

        // TODO: handle MSG_WAITALL
        if ((flags & ~(MSG_DONTWAIT | MSG_PEEK | MSG_WAITALL)) != 0)
                return EOPNOTSUPP;

        bigtime_t timeout = 0;
        if ((flags & MSG_DONTWAIT) == 0) {
                timeout = absolute_timeout(socket->receive.timeout);
                if (gStackModule->is_restarted_syscall())
                        timeout = gStackModule->restore_syscall_restart_timeout();
                else
                        gStackModule->store_syscall_restart_timeout(timeout);
        }

        UnixStreamEndpointLocker locker(this);

        // We can read as long as we have a FIFO. I.e. we are still connected, or
        // disconnected and not yet reconnected/listening/closed.
        if (fReceiveFifo == NULL)
                RETURN_ERROR(ENOTCONN);

        UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
        BReference<UnixStreamEndpoint> peerReference(peerEndpoint);

        // Copy the peer address upfront. This way, if we read something, we don't
        // get into a potential race with Close().
        if (_address != NULL) {
                socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
                memcpy(_address, &socket->peer, addrLen);
                *_addressLength = addrLen;
        }

        // lock our FIFO
        UnixFifo* fifo = fReceiveFifo;
        BReference<UnixFifo> _(fifo);
        UnixFifoLocker fifoLocker(fifo);

        // unlock endpoint
        locker.Unlock();

        ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout,
                (flags & MSG_PEEK) != 0);

        // Notify select()ing writers, if we successfully read anything.
        size_t writable = fifo->Writable();
        bool notifyWrite = (result >= 0 && writable > 0
                && !fifo->IsWriteShutdown());

        // Notify select()ing readers, if we failed to read anything and there's
        // still something left to read.
        size_t readable = fifo->Readable();
        bool notifyRead = (result < 0 && readable > 0
                && !fifo->IsReadShutdown());

        // re-lock our endpoint (unlock FIFO to respect locking order)
        fifoLocker.Unlock();
        locker.Lock();

        UnixStreamEndpointLocker peerLocker;
        bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
                && _LockConnectedEndpoints(locker, peerLocker) == B_OK);

        // send notifications
        if (notifyRead)
                gSocketModule->notify(socket, B_SELECT_READ, readable);
        if (peerLocked && notifyWrite)
                gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);

        switch (result) {
                case UNIX_FIFO_SHUTDOWN:
                        // Either our socket was closed or read shutdown.
                        if (fState == unix_stream_endpoint_state::Closed) {
                                // The FD has been closed.
                                result = EBADF;
                        } else {
                                // if (fReceiveFifo == fifo) {
                                //              Orderly shutdown or the peer closed the connection.
                                // } else {
                                //              Weird case: Peer closed connection and we are already
                                //              reconnected (or listening).
                                // }
                                result = 0;
                        }
                        break;
                case B_TIMED_OUT:
                        // translate non-blocking timeouts to the correct error code
                        if (timeout == 0)
                                result = B_WOULD_BLOCK;
                        break;
        }

        RETURN_ERROR(result);
}


ssize_t
UnixStreamEndpoint::Sendable()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
                this);

        UnixStreamEndpointLocker locker(this);
        UnixStreamEndpointLocker peerLocker;

        status_t error = _LockConnectedEndpoints(locker, peerLocker);
        if (error != B_OK)
                RETURN_ERROR(error);

        // lock the peer's FIFO
        UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
        UnixFifoLocker fifoLocker(peerFifo);

        RETURN_ERROR(peerFifo->Writable());
}


ssize_t
UnixStreamEndpoint::Receivable()
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
                this);

        UnixStreamEndpointLocker locker(this);

        if (fState == unix_stream_endpoint_state::Listening)
                return gSocketModule->count_connected(socket);

        if (fState != unix_stream_endpoint_state::Connected)
                RETURN_ERROR(ENOTCONN);

        UnixFifoLocker fifoLocker(fReceiveFifo);
        ssize_t readable = fReceiveFifo->Readable();
        if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
                        || fReceiveFifo->IsReadShutdown())) {
                RETURN_ERROR(ENOTCONN);
        }
        RETURN_ERROR(readable);
}


status_t
UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
                find_thread(NULL), this, size);

        UnixStreamEndpointLocker locker(this);

        if (fReceiveFifo == NULL)
                return B_OK;

        UnixFifoLocker fifoLocker(fReceiveFifo);
        return fReceiveFifo->SetBufferCapacity(size);
}


status_t
UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
{
        UnixStreamEndpointLocker locker(this);
        UnixStreamEndpointLocker peerLocker;

        status_t error = _LockConnectedEndpoints(locker, peerLocker);
        if (error != B_OK)
                RETURN_ERROR(error);

        *credentials = fPeerEndpoint->fCredentials;

        return B_OK;
}


status_t
UnixStreamEndpoint::Shutdown(int direction)
{
        TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
                find_thread(NULL), this, direction);

        uint32 shutdown;
        uint32 peerShutdown;

        // translate the direction into shutdown flags for our and the peer fifo
        switch (direction) {
                case SHUT_RD:
                        shutdown = UNIX_FIFO_SHUTDOWN_READ;
                        peerShutdown = 0;
                        break;
                case SHUT_WR:
                        shutdown = 0;
                        peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
                        break;
                case SHUT_RDWR:
                        shutdown = UNIX_FIFO_SHUTDOWN_READ;
                        peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
                        break;
                default:
                        RETURN_ERROR(B_BAD_VALUE);
        }

        // lock endpoints
        UnixStreamEndpointLocker locker(this);
        UnixStreamEndpointLocker peerLocker;

        status_t error = _LockConnectedEndpoints(locker, peerLocker);
        if (error != B_OK)
                RETURN_ERROR(error);

        // shutdown our FIFO
        fReceiveFifo->Lock();
        fReceiveFifo->Shutdown(shutdown);
        fReceiveFifo->Unlock();

        // shutdown peer FIFO
        fPeerEndpoint->fReceiveFifo->Lock();
        fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
        fPeerEndpoint->fReceiveFifo->Unlock();

        // send select notifications
        if (direction == SHUT_RD || direction == SHUT_RDWR) {
                gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
                gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
        }
        if (direction == SHUT_WR || direction == SHUT_RDWR) {
                gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
                gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
        }

        RETURN_ERROR(B_OK);
}


void
UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
        UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
{
        ProtocolSocket::Open();

        fIsChild = true;
        fPeerEndpoint = connectingEndpoint;
        fPeerEndpoint->AcquireReference();

        fReceiveFifo = fifo;

        PeerAddress().SetTo(&connectingEndpoint->socket->address);

        fCredentials = listeningEndpoint->fCredentials;

        fState = unix_stream_endpoint_state::Connected;

        gSocketModule->set_connected(Socket());
}


void
UnixStreamEndpoint::_Disconnect()
{
        // Both endpoints must be locked.

        // Write shutdown the receive FIFO.
        fReceiveFifo->Lock();
        fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
        fReceiveFifo->Unlock();

        // select() notification.
        gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
        gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);

        // Unset the peer endpoint.
        fPeerEndpoint->ReleaseReference();
        fPeerEndpoint = NULL;

        // We're officially disconnected.
// TODO: Deal with non accept()ed connections correctly!
        fIsChild = false;
        fState = unix_stream_endpoint_state::NotConnected;
}


status_t
UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
        UnixStreamEndpointLocker& peerLocker)
{
        if (fState != unix_stream_endpoint_state::Connected)
                RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);

        // We need to lock the peer, too. Get a reference -- we might need to
        // unlock ourselves to get the locking order right.
        BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
        UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;

        if (fIsChild) {
                // We're the child, but locking order is the other way around.
                locker.Unlock();
                peerLocker.SetTo(peerEndpoint, false);
                locker.Lock();

                // recheck our state, also whether the peer is still the same
                if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) {
                        peerLocker.Unset();
                        RETURN_ERROR(ENOTCONN);
                }
        } else
                peerLocker.SetTo(peerEndpoint, false);

        RETURN_ERROR(B_OK);
}


status_t
UnixStreamEndpoint::_Unbind()
{
        if (fState == unix_stream_endpoint_state::Connected
                || fState == unix_stream_endpoint_state::Listening)
                RETURN_ERROR(B_BAD_VALUE);

        if (IsBound())
                RETURN_ERROR(UnixEndpoint::_Unbind());

        RETURN_ERROR(B_OK);
}


void
UnixStreamEndpoint::_UnsetReceiveFifo()
{
        if (fReceiveFifo) {
                fReceiveFifo->ReleaseReference();
                fReceiveFifo = NULL;
        }
}


void
UnixStreamEndpoint::_StopListening()
{
        if (fState == unix_stream_endpoint_state::Listening) {
                delete_sem(fAcceptSemaphore);
                fAcceptSemaphore = -1;
                fState = unix_stream_endpoint_state::NotConnected;
        }
}