* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "UnixEndpoint.h"
#include <stdio.h>
#include <sys/stat.h>
#include <AutoDeleter.h>
#include <vfs.h>
#include "UnixAddressManager.h"
#include "UnixFifo.h"
#define UNIX_ENDPOINT_DEBUG_LEVEL 0
#define UNIX_DEBUG_LEVEL UNIX_ENDPOINT_DEBUG_LEVEL
#include "UnixDebug.h"
static inline bigtime_t
absolute_timeout(bigtime_t timeout)
{
if (timeout == 0 || timeout == B_INFINITE_TIMEOUT)
return timeout;
return timeout + system_time();
}
UnixEndpoint::UnixEndpoint(net_socket* socket)
:
ProtocolSocket(socket),
fAddress(),
fAddressHashLink(),
fPeerEndpoint(NULL),
fReceiveFifo(NULL),
fState(UNIX_ENDPOINT_CLOSED),
fAcceptSemaphore(-1),
fIsChild(false),
fWasConnected(false)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::UnixEndpoint()\n",
find_thread(NULL), this);
mutex_init(&fLock, "unix endpoint");
}
UnixEndpoint::~UnixEndpoint()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::~UnixEndpoint()\n",
find_thread(NULL), this);
mutex_destroy(&fLock);
}
status_t
UnixEndpoint::Init()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Init()\n", find_thread(NULL),
this);
RETURN_ERROR(B_OK);
}
void
UnixEndpoint::Uninit()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Uninit()\n", find_thread(NULL),
this);
UnixEndpointLocker locker(this);
bool closed = (fState == UNIX_ENDPOINT_CLOSED);
locker.Unlock();
if (!closed) {
Close();
}
ReleaseReference();
}
status_t
UnixEndpoint::Open()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Open()\n", find_thread(NULL),
this);
status_t error = ProtocolSocket::Open();
if (error != B_OK)
RETURN_ERROR(error);
fState = UNIX_ENDPOINT_NOT_CONNECTED;
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Close()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Close()\n", find_thread(NULL),
this);
UnixEndpointLocker locker(this);
if (fState == UNIX_ENDPOINT_CONNECTED) {
UnixEndpointLocker peerLocker;
if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
fPeerEndpoint->_Disconnect();
_Disconnect();
}
}
if (fState == UNIX_ENDPOINT_LISTENING)
_StopListening();
_Unbind();
fState = UNIX_ENDPOINT_CLOSED;
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Free()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Free()\n", find_thread(NULL),
this);
UnixEndpointLocker locker(this);
_UnsetReceiveFifo();
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Bind(const struct sockaddr *_address)
{
if (_address->sa_family != AF_UNIX)
RETURN_ERROR(EAFNOSUPPORT);
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Bind(\"%s\")\n",
find_thread(NULL), this,
ConstSocketAddress(&gAddressModule, _address).AsString().Data());
const sockaddr_un* address = (const sockaddr_un*)_address;
UnixEndpointLocker endpointLocker(this);
if (fState != UNIX_ENDPOINT_NOT_CONNECTED || IsBound())
RETURN_ERROR(B_BAD_VALUE);
if (address->sun_path[0] == '\0') {
UnixAddressManagerLocker addressLocker(gAddressManager);
int32 internalID;
if (UnixAddress::IsEmptyAddress(*address))
internalID = gAddressManager.NextUnusedInternalID();
else
internalID = UnixAddress::InternalID(*address);
if (internalID < 0)
RETURN_ERROR(internalID);
status_t error = _Bind(internalID);
if (error != B_OK)
RETURN_ERROR(error);
sockaddr_un* outAddress = (sockaddr_un*)&socket->address;
outAddress->sun_path[0] = '\0';
sprintf(outAddress->sun_path + 1, "%05" B_PRIx32, internalID);
outAddress->sun_len = INTERNAL_UNIX_ADDRESS_LEN;
gAddressManager.Add(this);
} else {
size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
if (pathLen == 0 || pathLen == sizeof(address->sun_path))
RETURN_ERROR(B_BAD_VALUE);
struct vnode* vnode;
status_t error = vfs_create_special_node(address->sun_path,
NULL, S_IFSOCK | 0644, 0, !gStackModule->is_syscall(), NULL,
&vnode);
if (error != B_OK)
RETURN_ERROR(error == B_FILE_EXISTS ? EADDRINUSE : error);
error = _Bind(vnode);
if (error != B_OK) {
vfs_put_vnode(vnode);
RETURN_ERROR(error);
}
size_t addressLen = address->sun_path + pathLen + 1 - (char*)address;
memcpy(&socket->address, address, addressLen);
socket->address.ss_len = addressLen;
UnixAddressManagerLocker addressLocker(gAddressManager);
gAddressManager.Add(this);
}
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Unbind()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Unbind()\n", find_thread(NULL),
this);
UnixEndpointLocker endpointLocker(this);
RETURN_ERROR(_Unbind());
}
status_t
UnixEndpoint::Listen(int backlog)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Listen(%d)\n", find_thread(NULL),
this, backlog);
UnixEndpointLocker endpointLocker(this);
if (!IsBound())
RETURN_ERROR(EDESTADDRREQ);
if (fState != UNIX_ENDPOINT_NOT_CONNECTED
&& fState != UNIX_ENDPOINT_LISTENING)
RETURN_ERROR(EINVAL);
gSocketModule->set_max_backlog(socket, backlog);
if (fState == UNIX_ENDPOINT_NOT_CONNECTED) {
fAcceptSemaphore = create_sem(0, "unix accept");
if (fAcceptSemaphore < 0)
RETURN_ERROR(ENOBUFS);
_UnsetReceiveFifo();
fCredentials.pid = getpid();
fCredentials.uid = geteuid();
fCredentials.gid = getegid();
fState = UNIX_ENDPOINT_LISTENING;
}
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Connect(const struct sockaddr *_address)
{
if (_address->sa_family != AF_UNIX)
RETURN_ERROR(EAFNOSUPPORT);
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Connect(\"%s\")\n",
find_thread(NULL), this,
ConstSocketAddress(&gAddressModule, _address).AsString().Data());
const sockaddr_un* address = (const sockaddr_un*)_address;
UnixEndpointLocker endpointLocker(this);
if (fState == UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(EISCONN);
if (fState != UNIX_ENDPOINT_NOT_CONNECTED)
RETURN_ERROR(B_BAD_VALUE);
UnixAddress unixAddress;
if (address->sun_path[0] == '\0') {
int32 internalID;
if (UnixAddress::IsEmptyAddress(*address))
RETURN_ERROR(B_BAD_VALUE);
internalID = UnixAddress::InternalID(*address);
if (internalID < 0)
RETURN_ERROR(internalID);
unixAddress.SetTo(internalID);
} else {
size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
if (pathLen == 0 || pathLen == sizeof(address->sun_path))
RETURN_ERROR(B_BAD_VALUE);
struct stat st;
status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
!gStackModule->is_syscall());
if (error != B_OK)
RETURN_ERROR(error);
if (!S_ISSOCK(st.st_mode))
RETURN_ERROR(B_BAD_VALUE);
unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
}
UnixAddressManagerLocker addressLocker(gAddressManager);
UnixEndpoint* listeningEndpoint = gAddressManager.Lookup(unixAddress);
if (listeningEndpoint == NULL)
RETURN_ERROR(ECONNREFUSED);
BReference<UnixEndpoint> peerReference(listeningEndpoint);
addressLocker.Unlock();
UnixEndpointLocker peerLocker(listeningEndpoint);
if (!listeningEndpoint->IsBound()
|| listeningEndpoint->fState != UNIX_ENDPOINT_LISTENING
|| listeningEndpoint->fAddress != unixAddress) {
RETURN_ERROR(ECONNREFUSED);
}
UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
ObjectDeleter<UnixFifo> fifoDeleter(fifo);
ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
status_t error;
if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
return error;
net_socket* newSocket;
error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
&newSocket);
if (error != B_OK)
RETURN_ERROR(error);
UnixEndpoint* connectedEndpoint = (UnixEndpoint*)newSocket->first_protocol;
UnixEndpointLocker connectedLocker(connectedEndpoint);
connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
_UnsetReceiveFifo();
fPeerEndpoint = connectedEndpoint;
PeerAddress().SetTo(&connectedEndpoint->socket->address);
fPeerEndpoint->AcquireReference();
fReceiveFifo = fifo;
fCredentials.pid = getpid();
fCredentials.uid = geteuid();
fCredentials.gid = getegid();
fifoDeleter.Detach();
peerFifoDeleter.Detach();
fState = UNIX_ENDPOINT_CONNECTED;
fWasConnected = true;
gSocketModule->set_connected(newSocket);
release_sem(listeningEndpoint->fAcceptSemaphore);
connectedLocker.Unlock();
peerLocker.Unlock();
endpointLocker.Unlock();
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Accept(net_socket **_acceptedSocket)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Accept()\n", find_thread(NULL),
this);
bigtime_t timeout = absolute_timeout(socket->receive.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixEndpointLocker locker(this);
status_t error;
do {
locker.Unlock();
error = acquire_sem_etc(fAcceptSemaphore, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
if (error < B_OK)
RETURN_ERROR(error);
locker.Lock();
error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
} while (error != B_OK);
if (error == B_TIMED_OUT && timeout == 0) {
error = B_WOULD_BLOCK;
}
RETURN_ERROR(error);
}
ssize_t
UnixEndpoint::Send(const iovec *vecs, size_t vecCount,
ancillary_data_container *ancillaryData)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Send(%p, %ld, %p)\n",
find_thread(NULL), this, vecs, vecCount, ancillaryData);
bigtime_t timeout = absolute_timeout(socket->send.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixEndpointLocker locker(this);
BReference<UnixEndpoint> peerReference;
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
peerReference.SetTo(peerEndpoint);
UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
BReference<UnixFifo> _(peerFifo);
UnixFifoLocker fifoLocker(peerFifo);
locker.Unlock();
peerLocker.Unlock();
ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, timeout);
size_t readable = peerFifo->Readable();
bool notifyRead = (error == B_OK && readable > 0
&& !peerFifo->IsReadShutdown());
size_t writable = peerFifo->Writable();
bool notifyWrite = (error != B_OK && writable > 0
&& !peerFifo->IsWriteShutdown());
fifoLocker.Unlock();
locker.Lock();
bool peerLocked = (fPeerEndpoint == peerEndpoint
&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
if (peerLocked && notifyRead)
gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
if (notifyWrite)
gSocketModule->notify(socket, B_SELECT_WRITE, writable);
switch (result) {
case UNIX_FIFO_SHUTDOWN:
if (fPeerEndpoint == peerEndpoint
&& fState == UNIX_ENDPOINT_CONNECTED) {
result = EPIPE;
} else {
result = EBADF;
}
break;
case EPIPE:
if (gStackModule->is_syscall())
send_signal(find_thread(NULL), SIGPIPE);
break;
case B_TIMED_OUT:
if (timeout == 0)
result = B_WOULD_BLOCK;
break;
}
RETURN_ERROR(result);
}
ssize_t
UnixEndpoint::Receive(const iovec *vecs, size_t vecCount,
ancillary_data_container **_ancillaryData, struct sockaddr *_address,
socklen_t *_addressLength)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Receive(%p, %ld)\n",
find_thread(NULL), this, vecs, vecCount);
bigtime_t timeout = absolute_timeout(socket->receive.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixEndpointLocker locker(this);
if (fReceiveFifo == NULL)
RETURN_ERROR(ENOTCONN);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
BReference<UnixEndpoint> peerReference(peerEndpoint);
if (_address != NULL) {
socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
memcpy(_address, &socket->peer, addrLen);
*_addressLength = addrLen;
}
UnixFifo* fifo = fReceiveFifo;
BReference<UnixFifo> _(fifo);
UnixFifoLocker fifoLocker(fifo);
locker.Unlock();
ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, timeout);
size_t writable = fifo->Writable();
bool notifyWrite = (result >= 0 && writable > 0
&& !fifo->IsWriteShutdown());
size_t readable = fifo->Readable();
bool notifyRead = (result < 0 && readable > 0
&& !fifo->IsReadShutdown());
fifoLocker.Unlock();
locker.Lock();
UnixEndpointLocker peerLocker;
bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
if (notifyRead)
gSocketModule->notify(socket, B_SELECT_READ, readable);
if (peerLocked && notifyWrite)
gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
switch (result) {
case UNIX_FIFO_SHUTDOWN:
if (fState == UNIX_ENDPOINT_CLOSED) {
result = EBADF;
} else {
result = 0;
}
break;
case B_TIMED_OUT:
if (timeout == 0)
result = B_WOULD_BLOCK;
break;
}
RETURN_ERROR(result);
}
ssize_t
UnixEndpoint::Sendable()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Sendable()\n", find_thread(NULL),
this);
UnixEndpointLocker locker(this);
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
UnixFifoLocker fifoLocker(peerFifo);
RETURN_ERROR(peerFifo->Writable());
}
ssize_t
UnixEndpoint::Receivable()
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Receivable()\n", find_thread(NULL),
this);
UnixEndpointLocker locker(this);
if (fState == UNIX_ENDPOINT_LISTENING)
return gSocketModule->count_connected(socket);
if (fState != UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(ENOTCONN);
UnixFifoLocker fifoLocker(fReceiveFifo);
ssize_t readable = fReceiveFifo->Readable();
if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
|| fReceiveFifo->IsReadShutdown())) {
RETURN_ERROR(ENOTCONN);
}
RETURN_ERROR(readable);
}
status_t
UnixEndpoint::SetReceiveBufferSize(size_t size)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::SetReceiveBufferSize(%lu)\n",
find_thread(NULL), this, size);
UnixEndpointLocker locker(this);
if (fReceiveFifo == NULL)
return B_BAD_VALUE;
UnixFifoLocker fifoLocker(fReceiveFifo);
return fReceiveFifo->SetBufferCapacity(size);
}
status_t
UnixEndpoint::GetPeerCredentials(ucred* credentials)
{
UnixEndpointLocker locker(this);
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
*credentials = fPeerEndpoint->fCredentials;
return B_OK;
}
status_t
UnixEndpoint::Shutdown(int direction)
{
TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Shutdown(%d)\n",
find_thread(NULL), this, direction);
uint32 shutdown;
uint32 peerShutdown;
switch (direction) {
case SHUT_RD:
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = 0;
break;
case SHUT_WR:
shutdown = 0;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
case SHUT_RDWR:
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
default:
RETURN_ERROR(B_BAD_VALUE);
}
UnixEndpointLocker locker(this);
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(shutdown);
fReceiveFifo->Unlock();
fPeerEndpoint->fReceiveFifo->Lock();
fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
fPeerEndpoint->fReceiveFifo->Unlock();
if (direction == SHUT_RD || direction == SHUT_RDWR) {
gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
}
if (direction == SHUT_WR || direction == SHUT_RDWR) {
gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
}
RETURN_ERROR(B_OK);
}
void
UnixEndpoint::_Spawn(UnixEndpoint* connectingEndpoint,
UnixEndpoint* listeningEndpoint, UnixFifo* fifo)
{
ProtocolSocket::Open();
fIsChild = true;
fPeerEndpoint = connectingEndpoint;
fPeerEndpoint->AcquireReference();
fReceiveFifo = fifo;
PeerAddress().SetTo(&connectingEndpoint->socket->address);
fCredentials = listeningEndpoint->fCredentials;
fState = UNIX_ENDPOINT_CONNECTED;
}
void
UnixEndpoint::_Disconnect()
{
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
fReceiveFifo->Unlock();
gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
fPeerEndpoint->ReleaseReference();
fPeerEndpoint = NULL;
fIsChild = false;
fState = UNIX_ENDPOINT_NOT_CONNECTED;
}
status_t
UnixEndpoint::_LockConnectedEndpoints(UnixEndpointLocker& locker,
UnixEndpointLocker& peerLocker)
{
if (fState != UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
BReference<UnixEndpoint> peerReference(fPeerEndpoint);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
if (fIsChild) {
locker.Unlock();
peerLocker.SetTo(peerEndpoint, false);
locker.Lock();
if (fState != UNIX_ENDPOINT_CONNECTED || peerEndpoint != fPeerEndpoint)
RETURN_ERROR(ENOTCONN);
} else
peerLocker.SetTo(peerEndpoint, false);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::_Bind(struct vnode* vnode)
{
struct stat st;
status_t error = vfs_stat_vnode(vnode, &st);
if (error != B_OK)
RETURN_ERROR(error);
fAddress.SetTo(st.st_dev, st.st_ino, vnode);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::_Bind(int32 internalID)
{
fAddress.SetTo(internalID);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::_Unbind()
{
if (fState == UNIX_ENDPOINT_CONNECTED || fState == UNIX_ENDPOINT_LISTENING)
RETURN_ERROR(B_BAD_VALUE);
if (IsBound()) {
UnixAddressManagerLocker addressLocker(gAddressManager);
gAddressManager.Remove(this);
if (struct vnode* vnode = fAddress.Vnode())
vfs_put_vnode(vnode);
fAddress.Unset();
}
RETURN_ERROR(B_OK);
}
void
UnixEndpoint::_UnsetReceiveFifo()
{
if (fReceiveFifo) {
fReceiveFifo->ReleaseReference();
fReceiveFifo = NULL;
}
}
void
UnixEndpoint::_StopListening()
{
if (fState == UNIX_ENDPOINT_LISTENING) {
delete_sem(fAcceptSemaphore);
fAcceptSemaphore = -1;
fState = UNIX_ENDPOINT_NOT_CONNECTED;
}
}