Skip to content

Commit

Permalink
Rewrite link reciever
Browse files Browse the repository at this point in the history
  • Loading branch information
madhurajayaraman committed Oct 25, 2024
1 parent 14345da commit b390a53
Showing 1 changed file with 122 additions and 65 deletions.
187 changes: 122 additions & 65 deletions starboard/shared/starboard/link_receiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@

#include "starboard/common/file.h"
#include "starboard/common/log.h"
#include "starboard/common/memory.h"
#include "starboard/common/semaphore.h"
#include "starboard/common/socket.h"
#include "starboard/common/string.h"
#include "starboard/configuration_constants.h"
#include "starboard/shared/posix/handle_eintr.h"
#include "starboard/shared/posix/set_non_blocking_internal.h"
#include "starboard/shared/posix/socket_internal.h"
#include "starboard/shared/starboard/application.h"
#include "starboard/socket_waiter.h"
#include "starboard/system.h"
Expand All @@ -38,17 +42,10 @@ namespace starboard {

namespace {

#if defined(SOMAXCONN)
const int kMaxConn = SOMAXCONN;
#else
const int kMaxConn = 128;
#endif

// Creates a socket that is appropriate for binding and listening, but is not
// bound and hasn't started listening yet.
int CreateServerSocket(SbSocketAddressType address_type) {
SB_DLOG(INFO) << "Entering CreateServerSocket";
int socket_fd = -1;
int socket_fd;
switch (address_type) {
case kSbSocketAddressTypeIpv4:
socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
Expand All @@ -57,93 +54,131 @@ int CreateServerSocket(SbSocketAddressType address_type) {
socket_fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
break;
default:
break;
SB_NOTREACHED();
errno = EAFNOSUPPORT;
return -1;
}

if (socket_fd < 0) {
return -1;
}

// All Starboard sockets are non-blocking, so let's ensure it.
if (!posix::SetNonBlocking(socket_fd)) {
// Something went wrong, we'll clean up (preserving errno) and return
// failure.
HANDLE_EINTR(close(socket_fd));
return errno;
}

#if !defined(MSG_NOSIGNAL) && defined(SO_NOSIGPIPE)
// Use SO_NOSIGPIPE to mute SIGPIPE on darwin systems.
int optval_set = 1;
setsockopt(socket_fd, SOL_SOCKET, SO_NOSIGPIPE,
reinterpret_cast<void*>(&optval_set), sizeof(int));
#endif

if (socket_fd < 0) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "Socket create failed, errno: " << errno;
return -1;
}
SB_DLOG(INFO) << "Socket successfully created";

int on = 1;
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "Socket set reuse address failed, errno : " << errno;
int result = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (result != 0) {
SB_LOG(ERROR) << "Failed to set SO_REUSEADDR on socket " << socket_fd
<< ", errno = " << errno;
return -1;
}
SB_DLOG(INFO) << "Socket created with set reuse address true : " << socket_fd;

return socket_fd;
}

// Creates a server socket that is bound to the loopback interface.
int CreateLocallyBoundSocket(SbSocketAddressType address_type, int port) {
SB_DLOG(INFO) << "Entering CreateLocallyBoundSocket";
int socket = CreateServerSocket(address_type);
if (socket < 0) {
int socket_fd = CreateServerSocket(address_type);
if (socket_fd < 0) {
return -1;
}

struct sockaddr_in addr_in = {0};
socklen_t socklen = static_cast<socklen_t>(sizeof(addr_in));
int local_add_result =
getsockname(socket, reinterpret_cast<sockaddr*>(&addr_in), &socklen);
SbSocketAddress address = {};
bool success = GetLocalhostAddress(address_type, port, &address);
if (!success) {
SB_LOG(ERROR) << "GetLocalhostAddress failed";
return -1;
}

if (local_add_result < 0) {
SB_LOG(ERROR) << "Get local address failed, errno : " << errno;
posix::SockAddr sock_addr;
if (!sock_addr.FromSbSocketAddress(&address)) {
SB_LOG(ERROR) << __FUNCTION__ << ": Invalid address";
return -1;
}
SB_DLOG(INFO) << "Successfully got local address";

int bind_result =
bind(socket, reinterpret_cast<sockaddr*>(&addr_in), sizeof(sockaddr));
SB_DCHECK(socket_fd >= 0);
SbSocketAddress* local_address = &address;
// When binding to the IPV6 any address, ensure that the IPV6_V6ONLY flag is
// off to allow incoming IPV4 connections on the same socket.
// See https://www.ietf.org/rfc/rfc3493.txt for details.
if (local_address && (local_address->type == kSbSocketAddressTypeIpv6) &&
common::MemoryIsZero(local_address->address, 16)) {
int on = 1;
if (setsockopt(socket_fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) !=
0) {
// Silently ignore errors, assume the default behavior is as expected.
errno = 0;
}
}

if (bind_result != 0) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "Socket bind to " << port << " failed, errno : " << errno;
int result =
HANDLE_EINTR(bind(socket_fd, sock_addr.sockaddr(), sock_addr.length));
if (result != 0) {
SB_LOG(ERROR) << __FUNCTION__ << ": Bind failed. errno=" << errno;
return -1;
}
SB_DLOG(INFO) << "Successfully bound socket";

return socket;
return socket_fd;
}

// Creates a server socket that is bound and listening to the loopback interface
// on the given port.
int CreateListeningSocket(SbSocketAddressType address_type, int port) {
SB_DLOG(INFO) << "Entering CreateListeningSocket";
int socket = CreateLocallyBoundSocket(address_type, port);
if (socket < 0) {
int socket_fd = CreateLocallyBoundSocket(address_type, port);
if (socket_fd < 0) {
return -1;
}

int listen_result = listen(socket, kMaxConn);
if (listen_result != 0) {
#if defined(SOMAXCONN)
const int kMaxConn = SOMAXCONN;
#else
const int kMaxConn = 128;
#endif
int result = listen(socket_fd, kMaxConn);
if (result != 0) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "Socket listen failed, errno : " << errno;
<< "SbSocketListen failed: " << result;
return -1;
}
SB_DLOG(INFO) << "Successfully listening";

return socket;
return -1;
}

// Gets the port socket is bound to.
bool GetBoundPort(int socket, int* out_port) {
SB_DCHECK(out_port);
SB_DCHECK(socket >= 0);
SB_DCHECK(socket);

struct sockaddr_in socket_address = {0};
socklen_t socklen = static_cast<socklen_t>(sizeof(socket_address));
int local_address = getsockname(
socket, reinterpret_cast<sockaddr*>(&socket_address), &socklen);
SB_DLOG(INFO) << "Got bound port";
if (local_address != 0) {
SB_DLOG(INFO) << "Bound port failed errno: " << errno;
SbSocketAddress socket_address = {0};
posix::SockAddr sock_addr;
int result = getsockname(socket, sock_addr.sockaddr(), &sock_addr.length);
if (result < 0) {
return false;
}
if (!sock_addr.ToSbSocketAddress(&socket_address)) {
return false;
}

*out_port = socket_address.sin_port;
*out_port = socket_address.port;
return true;
}

Expand Down Expand Up @@ -191,7 +226,7 @@ class LinkReceiver::Impl {
private:
// Encapsulates connection state.
struct Connection {
explicit Connection(int socket) { socket_fd = socket; }
explicit Connection(int socket_fd) { socket = std::move(socket_fd); }
~Connection() {}
void FlushLink(Application* application) {
if (!data.empty()) {
Expand All @@ -200,7 +235,7 @@ class LinkReceiver::Impl {
}
}

int socket_fd;
int socket;
std::string data;
};

Expand All @@ -210,7 +245,7 @@ class LinkReceiver::Impl {

// Adds |socket| to the SbSocketWaiter to wait until ready for accepting a new
// connection.
bool AddForAccept(int socket_fd);
bool AddForAccept(int socket);

// Adds the |connection| to the SbSocketWaiter to wait until ready to read
// more data.
Expand All @@ -230,11 +265,11 @@ class LinkReceiver::Impl {

// SbSocketWaiter entry points.
static void HandleAccept(SbSocketWaiter waiter,
int socket_fd,
int socket,
void* context,
int ready_interests);
static void HandleRead(SbSocketWaiter waiter,
int socket_fd,
int socket,
void* context,
int ready_interests);

Expand Down Expand Up @@ -303,11 +338,11 @@ void LinkReceiver::Impl::Run() {

listen_socket_ =
CreateListeningSocket(kSbSocketAddressTypeIpv4, specified_port_);
if (listen_socket_ < 0) {
if (!listen_socket_ || listen_socket_ < 0) {
listen_socket_ =
CreateListeningSocket(kSbSocketAddressTypeIpv6, specified_port_);
}
if (listen_socket_ < 0) {
if (!listen_socket_ || listen_socket_ < 0) {
SB_LOG(WARNING) << "Unable to start LinkReceiver on port "
<< specified_port_ << ".";
SbSocketWaiterDestroy(waiter_);
Expand Down Expand Up @@ -352,19 +387,19 @@ void LinkReceiver::Impl::Run() {
SbSocketWaiterDestroy(waiter_);
}

bool LinkReceiver::Impl::AddForAccept(int socket_fd) {
if (!SbPosixSocketWaiterAdd(waiter_, socket_fd, this,
bool LinkReceiver::Impl::AddForAccept(int socket) {
if (!SbPosixSocketWaiterAdd(waiter_, socket, this,
&LinkReceiver::Impl::HandleAccept,
kSbSocketWaiterInterestRead, true)) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbPosixSocketWaiterAdd failed.";
<< "SbSocketWaiterAdd failed.";
return false;
}
return true;
}

bool LinkReceiver::Impl::AddForRead(Connection* connection) {
if (!SbPosixSocketWaiterAdd(waiter_, connection->socket_fd, this,
if (!SbPosixSocketWaiterAdd(waiter_, connection->socket, this,
&LinkReceiver::Impl::HandleRead,
kSbSocketWaiterInterestRead, false)) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
Expand All @@ -375,10 +410,23 @@ bool LinkReceiver::Impl::AddForRead(Connection* connection) {
}

void LinkReceiver::Impl::OnAcceptReady() {
int accepted_socket = accept(listen_socket_, NULL, NULL);
SB_DCHECK(listen_socket_ >= 0);

int accepted_socket = HANDLE_EINTR(accept(listen_socket_, NULL, NULL));
if (accepted_socket < 0) {
return;
}

// All Starboard sockets are non-blocking, so let's ensure it.
if (!posix::SetNonBlocking(accepted_socket)) {
// Something went wrong, we'll clean up and return failure.
HANDLE_EINTR(close(accepted_socket));
return;
}

SB_DCHECK(accepted_socket);
Connection* connection = new Connection(accepted_socket);
connections_.emplace(connection->socket_fd, connection);
Connection* connection = new Connection(std::move(accepted_socket));
connections_.emplace(connection->socket, connection);
AddForRead(connection);
}

Expand All @@ -389,11 +437,20 @@ void LinkReceiver::Impl::OnReadReady(int socket_fd) {
}

void LinkReceiver::Impl::OnReadReady(Connection* connection) {
int socket = connection->socket_fd;
int socket = connection->socket;

char data[64] = {0};
ssize_t bytes_read = recv(socket, data, SB_ARRAY_SIZE_INT(data), 0);
int read = static_cast<int>(bytes_read);
int read = -1;

const int kRecvFlags = 0;

SB_DCHECK(socket >= 0);

ssize_t bytes_read = recv(socket, data, SB_ARRAY_SIZE_INT(data), kRecvFlags);
if (bytes_read >= 0) {
read = static_cast<int>(bytes_read);
}

int last_null = 0;
for (int position = 0; position < read; ++position) {
if (data[position] == '\0' || data[position] == '\n' ||
Expand Down

0 comments on commit b390a53

Please sign in to comment.