/* * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. * Distributed under the terms of the MIT License. */ #include "UnixStreamEndpoint.h" #include #include #include #include #include "UnixAddressManager.h" #include "UnixFifo.h" #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 0 #define UNIX_DEBUG_LEVEL UNIX_STREAM_ENDPOINT_DEBUG_LEVEL #include "UnixDebug.h" // Note on locking order (outermost -> innermost): // UnixStreamEndpoint: connecting -> listening -> child // -> UnixFifo (never lock more than one at a time) // -> UnixAddressManager UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket, bool atomic) : UnixEndpoint(socket), fPeerEndpoint(NULL), fReceiveFifo(NULL), fState(unix_stream_endpoint_state::Closed), fAcceptSemaphore(-1), fIsChild(false), fWasConnected(false), fAtomic(atomic) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n", find_thread(NULL), this); } UnixStreamEndpoint::~UnixStreamEndpoint() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n", find_thread(NULL), this); } status_t UnixStreamEndpoint::Init() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL), this); RETURN_ERROR(B_OK); } void UnixStreamEndpoint::Uninit() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL), this); // check whether we're closed UnixStreamEndpointLocker locker(this); bool closed = (fState == unix_stream_endpoint_state::Closed); locker.Unlock(); if (!closed) { // That probably means, we're a child endpoint of a listener and // have been fully connected, but not yet accepted. Our Close() // hook isn't called in this case. Do it manually. Close(); } ReleaseReference(); } status_t UnixStreamEndpoint::Open() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL), this); status_t error = ProtocolSocket::Open(); if (error != B_OK) RETURN_ERROR(error); fState = unix_stream_endpoint_state::NotConnected; RETURN_ERROR(B_OK); } status_t UnixStreamEndpoint::Close() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL), this); UnixStreamEndpointLocker locker(this); if (fState == unix_stream_endpoint_state::Connected) { BReference peerReference; UnixStreamEndpointLocker peerLocker; if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) { // The peer may be destroyed when we release it, // so grab a reference for the locker's sake. peerReference.SetTo(peerLocker.Get(), false); // We're still connected. Disconnect both endpoints! fPeerEndpoint->_Disconnect(); _Disconnect(); } } if (fState == unix_stream_endpoint_state::Listening) _StopListening(); _Unbind(); fState = unix_stream_endpoint_state::Closed; RETURN_ERROR(B_OK); } status_t UnixStreamEndpoint::Free() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL), this); UnixStreamEndpointLocker locker(this); _UnsetReceiveFifo(); RETURN_ERROR(B_OK); } status_t UnixStreamEndpoint::Bind(const struct sockaddr* _address) { if (_address->sa_family != AF_UNIX) RETURN_ERROR(EAFNOSUPPORT); TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n", find_thread(NULL), this, ConstSocketAddress(&gAddressModule, _address).AsString().Data()); const sockaddr_un* address = (const sockaddr_un*)_address; UnixStreamEndpointLocker endpointLocker(this); if (fState != unix_stream_endpoint_state::NotConnected || IsBound()) RETURN_ERROR(B_BAD_VALUE); RETURN_ERROR(_Bind(address)); } status_t UnixStreamEndpoint::Unbind() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL), this); UnixStreamEndpointLocker endpointLocker(this); RETURN_ERROR(_Unbind()); } status_t UnixStreamEndpoint::Listen(int backlog) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL), this, backlog); UnixStreamEndpointLocker endpointLocker(this); if (!IsBound()) RETURN_ERROR(EDESTADDRREQ); if (fState != unix_stream_endpoint_state::NotConnected && fState != unix_stream_endpoint_state::Listening) RETURN_ERROR(EINVAL); gSocketModule->set_max_backlog(socket, backlog); if (fState == unix_stream_endpoint_state::NotConnected) { fAcceptSemaphore = create_sem(0, "unix accept"); if (fAcceptSemaphore < 0) RETURN_ERROR(ENOBUFS); _UnsetReceiveFifo(); fCredentials.pid = getpid(); fCredentials.uid = geteuid(); fCredentials.gid = getegid(); fState = unix_stream_endpoint_state::Listening; } RETURN_ERROR(B_OK); } status_t UnixStreamEndpoint::Connect(const struct sockaddr* _address) { if (_address->sa_family != AF_UNIX) RETURN_ERROR(EAFNOSUPPORT); TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n", find_thread(NULL), this, ConstSocketAddress(&gAddressModule, _address).AsString().Data()); const sockaddr_un* address = (const sockaddr_un*)_address; UnixStreamEndpointLocker endpointLocker(this); if (fState == unix_stream_endpoint_state::Connected) RETURN_ERROR(EISCONN); if (fState != unix_stream_endpoint_state::NotConnected) RETURN_ERROR(B_BAD_VALUE); // TODO: If listening, we could set the backlog to 0 and connect. // check the address first UnixAddress unixAddress; if (address->sun_path[0] == '\0') { // internal address space (or empty address) int32 internalID; if (UnixAddress::IsEmptyAddress(*address)) RETURN_ERROR(B_BAD_VALUE); internalID = UnixAddress::InternalID(*address); if (internalID < 0) RETURN_ERROR(internalID); unixAddress.SetTo(internalID); } else { // FS address space size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path)); if (pathLen == 0 || pathLen == sizeof(address->sun_path)) RETURN_ERROR(B_BAD_VALUE); struct stat st; status_t error = vfs_read_stat(-1, address->sun_path, true, &st, !gStackModule->is_syscall()); if (error != B_OK) RETURN_ERROR(error); if (!S_ISSOCK(st.st_mode)) RETURN_ERROR(B_BAD_VALUE); unixAddress.SetTo(st.st_dev, st.st_ino, NULL); } // get the peer endpoint UnixAddressManagerLocker addressLocker(gAddressManager); UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress); if (listeningUnixEndpoint == NULL) RETURN_ERROR(ECONNREFUSED); UnixStreamEndpoint* listeningEndpoint = dynamic_cast(listeningUnixEndpoint); if (listeningEndpoint == NULL) RETURN_ERROR(EPROTOTYPE); BReference peerReference(listeningEndpoint); addressLocker.Unlock(); UnixStreamEndpointLocker peerLocker(listeningEndpoint); if (!listeningEndpoint->IsBound() || listeningEndpoint->fState != unix_stream_endpoint_state::Listening || listeningEndpoint->fAddress != unixAddress) { RETURN_ERROR(ECONNREFUSED); } // Allocate FIFOs for us and the socket we're going to spawn. We do that // now, so that the mess we need to cleanup, if allocating them fails, is // harmless. UnixFifoType type = fAtomic ? UnixFifoType::Datagram : UnixFifoType::Stream; UnixFifo* fifo = new(nothrow) UnixFifo(socket->receive.buffer_size, type); UnixFifo* peerFifo = new(nothrow) UnixFifo(socket->send.buffer_size, type); ObjectDeleter fifoDeleter(fifo); ObjectDeleter peerFifoDeleter(peerFifo); status_t error; if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK) return error; // spawn new endpoint for accept() net_socket* newSocket; error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket, &newSocket); if (error != B_OK) RETURN_ERROR(error); // init connected peer endpoint UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol; UnixStreamEndpointLocker connectedLocker(connectedEndpoint); connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo); // update our attributes _UnsetReceiveFifo(); fPeerEndpoint = connectedEndpoint; PeerAddress().SetTo(&connectedEndpoint->socket->address); fPeerEndpoint->AcquireReference(); fReceiveFifo = fifo; fCredentials.pid = getpid(); fCredentials.uid = geteuid(); fCredentials.gid = getegid(); fifoDeleter.Detach(); peerFifoDeleter.Detach(); fState = unix_stream_endpoint_state::Connected; fWasConnected = true; gSocketModule->set_connected(Socket()); release_sem(listeningEndpoint->fAcceptSemaphore); connectedLocker.Unlock(); peerLocker.Unlock(); endpointLocker.Unlock(); RETURN_ERROR(B_OK); } status_t UnixStreamEndpoint::Accept(net_socket** _acceptedSocket) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL), this); bigtime_t timeout = absolute_timeout(socket->receive.timeout); if (gStackModule->is_restarted_syscall()) timeout = gStackModule->restore_syscall_restart_timeout(); else gStackModule->store_syscall_restart_timeout(timeout); UnixStreamEndpointLocker locker(this); status_t error; do { locker.Unlock(); error = acquire_sem_etc(fAcceptSemaphore, 1, B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); if (error < B_OK) break; locker.Lock(); error = gSocketModule->dequeue_connected(socket, _acceptedSocket); } while (error != B_OK); if (error == B_TIMED_OUT && timeout == 0) { // translate non-blocking timeouts to the correct error code error = B_WOULD_BLOCK; } RETURN_ERROR(error); } ssize_t UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount, ancillary_data_container* ancillaryData, const struct sockaddr* address, socklen_t addressLength, int flags) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n", find_thread(NULL), this, vecs, vecCount, ancillaryData); // TODO: handle MSG_EOR bool atomic = (flags & MSG_EOR) != 0; if ((atomic && !fAtomic) || (flags & ~(MSG_DONTWAIT | MSG_EOR)) != 0) return EOPNOTSUPP; bigtime_t timeout = 0; if ((flags & MSG_DONTWAIT) == 0) { timeout = absolute_timeout(socket->send.timeout); if (gStackModule->is_restarted_syscall()) timeout = gStackModule->restore_syscall_restart_timeout(); else gStackModule->store_syscall_restart_timeout(timeout); } UnixStreamEndpointLocker locker(this); BReference peerReference; UnixStreamEndpointLocker peerLocker; status_t error = _LockConnectedEndpoints(locker, peerLocker); if (error != B_OK) RETURN_ERROR(error); UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; peerReference.SetTo(peerEndpoint); // lock the peer's FIFO UnixFifo* peerFifo = peerEndpoint->fReceiveFifo; BReference _(peerFifo); UnixFifoLocker fifoLocker(peerFifo); // unlock endpoints locker.Unlock(); peerLocker.Unlock(); ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout); // Notify select()ing readers, if we successfully wrote anything. size_t readable = peerFifo->Readable(); bool notifyRead = (error == B_OK && readable > 0 && !peerFifo->IsReadShutdown()); // Notify select()ing writers, if we failed to write anything and there's // still room to write. size_t writable = peerFifo->Writable(); bool notifyWrite = (error != B_OK && writable > 0 && !peerFifo->IsWriteShutdown()); // re-lock our endpoint (unlock FIFO to respect locking order) fifoLocker.Unlock(); locker.Lock(); bool peerLocked = (fPeerEndpoint == peerEndpoint && _LockConnectedEndpoints(locker, peerLocker) == B_OK); // send notifications if (peerLocked && notifyRead) gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable); if (notifyWrite) gSocketModule->notify(socket, B_SELECT_WRITE, writable); switch (result) { case UNIX_FIFO_SHUTDOWN: if (fPeerEndpoint == peerEndpoint && fState == unix_stream_endpoint_state::Connected) { // Orderly write shutdown on our side. // Note: Linux and Solaris also send a SIGPIPE, but according // the send() specification that shouldn't be done. result = EPIPE; } else { // The FD has been closed. result = EBADF; } break; case EPIPE: // The socket module will generate SIGPIPE for us, if necessary. break; case B_TIMED_OUT: // Translate non-blocking timeouts to the correct error code. if (timeout == 0) result = B_WOULD_BLOCK; break; } RETURN_ERROR(result); } ssize_t UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount, ancillary_data_container** _ancillaryData, struct sockaddr* _address, socklen_t* _addressLength, int flags) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n", find_thread(NULL), this, vecs, vecCount); // TODO: handle MSG_WAITALL if ((flags & ~(MSG_DONTWAIT | MSG_PEEK | MSG_WAITALL)) != 0) return EOPNOTSUPP; bigtime_t timeout = 0; if ((flags & MSG_DONTWAIT) == 0) { timeout = absolute_timeout(socket->receive.timeout); if (gStackModule->is_restarted_syscall()) timeout = gStackModule->restore_syscall_restart_timeout(); else gStackModule->store_syscall_restart_timeout(timeout); } UnixStreamEndpointLocker locker(this); // We can read as long as we have a FIFO. I.e. we are still connected, or // disconnected and not yet reconnected/listening/closed. if (fReceiveFifo == NULL) RETURN_ERROR(ENOTCONN); UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; BReference peerReference(peerEndpoint); // Copy the peer address upfront. This way, if we read something, we don't // get into a potential race with Close(). if (_address != NULL) { socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len); memcpy(_address, &socket->peer, addrLen); *_addressLength = addrLen; } // lock our FIFO UnixFifo* fifo = fReceiveFifo; BReference _(fifo); UnixFifoLocker fifoLocker(fifo); // unlock endpoint locker.Unlock(); ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout, (flags & MSG_PEEK) != 0); // Notify select()ing writers, if we successfully read anything. size_t writable = fifo->Writable(); bool notifyWrite = (result >= 0 && writable > 0 && !fifo->IsWriteShutdown()); // Notify select()ing readers, if we failed to read anything and there's // still something left to read. size_t readable = fifo->Readable(); bool notifyRead = (result < 0 && readable > 0 && !fifo->IsReadShutdown()); // re-lock our endpoint (unlock FIFO to respect locking order) fifoLocker.Unlock(); locker.Lock(); UnixStreamEndpointLocker peerLocker; bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint && _LockConnectedEndpoints(locker, peerLocker) == B_OK); // send notifications if (notifyRead) gSocketModule->notify(socket, B_SELECT_READ, readable); if (peerLocked && notifyWrite) gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable); switch (result) { case UNIX_FIFO_SHUTDOWN: // Either our socket was closed or read shutdown. if (fState == unix_stream_endpoint_state::Closed) { // The FD has been closed. result = EBADF; } else { // if (fReceiveFifo == fifo) { // Orderly shutdown or the peer closed the connection. // } else { // Weird case: Peer closed connection and we are already // reconnected (or listening). // } result = 0; } break; case B_TIMED_OUT: // translate non-blocking timeouts to the correct error code if (timeout == 0) result = B_WOULD_BLOCK; break; } RETURN_ERROR(result); } ssize_t UnixStreamEndpoint::Sendable() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL), this); UnixStreamEndpointLocker locker(this); UnixStreamEndpointLocker peerLocker; status_t error = _LockConnectedEndpoints(locker, peerLocker); if (error != B_OK) RETURN_ERROR(error); // lock the peer's FIFO UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo; UnixFifoLocker fifoLocker(peerFifo); RETURN_ERROR(peerFifo->Writable()); } ssize_t UnixStreamEndpoint::Receivable() { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL), this); UnixStreamEndpointLocker locker(this); if (fState == unix_stream_endpoint_state::Listening) return gSocketModule->count_connected(socket); if (fState != unix_stream_endpoint_state::Connected) RETURN_ERROR(ENOTCONN); UnixFifoLocker fifoLocker(fReceiveFifo); ssize_t readable = fReceiveFifo->Readable(); if (readable == 0 && (fReceiveFifo->IsWriteShutdown() || fReceiveFifo->IsReadShutdown())) { RETURN_ERROR(ENOTCONN); } RETURN_ERROR(readable); } status_t UnixStreamEndpoint::SetReceiveBufferSize(size_t size) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n", find_thread(NULL), this, size); UnixStreamEndpointLocker locker(this); if (fReceiveFifo == NULL) return B_OK; UnixFifoLocker fifoLocker(fReceiveFifo); return fReceiveFifo->SetBufferCapacity(size); } status_t UnixStreamEndpoint::GetPeerCredentials(ucred* credentials) { UnixStreamEndpointLocker locker(this); UnixStreamEndpointLocker peerLocker; status_t error = _LockConnectedEndpoints(locker, peerLocker); if (error != B_OK) RETURN_ERROR(error); *credentials = fPeerEndpoint->fCredentials; return B_OK; } status_t UnixStreamEndpoint::Shutdown(int direction) { TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n", find_thread(NULL), this, direction); uint32 shutdown; uint32 peerShutdown; // translate the direction into shutdown flags for our and the peer fifo switch (direction) { case SHUT_RD: shutdown = UNIX_FIFO_SHUTDOWN_READ; peerShutdown = 0; break; case SHUT_WR: shutdown = 0; peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; break; case SHUT_RDWR: shutdown = UNIX_FIFO_SHUTDOWN_READ; peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; break; default: RETURN_ERROR(B_BAD_VALUE); } // lock endpoints UnixStreamEndpointLocker locker(this); UnixStreamEndpointLocker peerLocker; status_t error = _LockConnectedEndpoints(locker, peerLocker); if (error != B_OK) RETURN_ERROR(error); // shutdown our FIFO fReceiveFifo->Lock(); fReceiveFifo->Shutdown(shutdown); fReceiveFifo->Unlock(); // shutdown peer FIFO fPeerEndpoint->fReceiveFifo->Lock(); fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown); fPeerEndpoint->fReceiveFifo->Unlock(); // send select notifications if (direction == SHUT_RD || direction == SHUT_RDWR) { gSocketModule->notify(socket, B_SELECT_READ, EPIPE); gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE); } if (direction == SHUT_WR || direction == SHUT_RDWR) { gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE); gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE); } RETURN_ERROR(B_OK); } void UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint, UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo) { ProtocolSocket::Open(); fIsChild = true; fPeerEndpoint = connectingEndpoint; fPeerEndpoint->AcquireReference(); fReceiveFifo = fifo; PeerAddress().SetTo(&connectingEndpoint->socket->address); fCredentials = listeningEndpoint->fCredentials; fState = unix_stream_endpoint_state::Connected; gSocketModule->set_connected(Socket()); } void UnixStreamEndpoint::_Disconnect() { // Both endpoints must be locked. // Write shutdown the receive FIFO. fReceiveFifo->Lock(); fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE); fReceiveFifo->Unlock(); // select() notification. gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET); gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET); // Unset the peer endpoint. fPeerEndpoint->ReleaseReference(); fPeerEndpoint = NULL; // We're officially disconnected. // TODO: Deal with non accept()ed connections correctly! fIsChild = false; fState = unix_stream_endpoint_state::NotConnected; } status_t UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker, UnixStreamEndpointLocker& peerLocker) { if (fState != unix_stream_endpoint_state::Connected) RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN); // We need to lock the peer, too. Get a reference -- we might need to // unlock ourselves to get the locking order right. BReference peerReference(fPeerEndpoint); UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; if (fIsChild) { // We're the child, but locking order is the other way around. locker.Unlock(); peerLocker.SetTo(peerEndpoint, false); locker.Lock(); // recheck our state, also whether the peer is still the same if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) { peerLocker.Unset(); RETURN_ERROR(ENOTCONN); } } else peerLocker.SetTo(peerEndpoint, false); RETURN_ERROR(B_OK); } status_t UnixStreamEndpoint::_Unbind() { if (fState == unix_stream_endpoint_state::Connected || fState == unix_stream_endpoint_state::Listening) RETURN_ERROR(B_BAD_VALUE); if (IsBound()) RETURN_ERROR(UnixEndpoint::_Unbind()); RETURN_ERROR(B_OK); } void UnixStreamEndpoint::_UnsetReceiveFifo() { if (fReceiveFifo) { fReceiveFifo->ReleaseReference(); fReceiveFifo = NULL; } } void UnixStreamEndpoint::_StopListening() { if (fState == unix_stream_endpoint_state::Listening) { delete_sem(fAcceptSemaphore); fAcceptSemaphore = -1; fState = unix_stream_endpoint_state::NotConnected; } }