From dc95bd0655fc34f0844056d872e1fc7291d23e4b Mon Sep 17 00:00:00 2001 From: Madhura Jayaraman Date: Fri, 1 Nov 2024 06:43:26 -0700 Subject: [PATCH] Posix-ify link_receiver.cc (#4229) (#4333) b/370776046 --- starboard/shared/starboard/link_receiver.cc | 224 ++++++++++++++------ 1 file changed, 157 insertions(+), 67 deletions(-) diff --git a/starboard/shared/starboard/link_receiver.cc b/starboard/shared/starboard/link_receiver.cc index c082fb2e5cc8..1d6befbf43e5 100644 --- a/starboard/shared/starboard/link_receiver.cc +++ b/starboard/shared/starboard/link_receiver.cc @@ -14,6 +14,8 @@ #include "starboard/shared/starboard/link_receiver.h" +#include +#include #include #include #include @@ -22,10 +24,14 @@ #include "starboard/common/file.h" #include "starboard/common/log.h" +#include "starboard/common/memory.h" #include "starboard/common/semaphore.h" #include "starboard/common/socket.h" #include "starboard/common/string.h" #include "starboard/configuration_constants.h" +#include "starboard/shared/posix/handle_eintr.h" +#include "starboard/shared/posix/set_non_blocking_internal.h" +#include "starboard/shared/posix/socket_internal.h" #include "starboard/shared/starboard/application.h" #include "starboard/socket_waiter.h" #include "starboard/system.h" @@ -35,77 +41,139 @@ namespace shared { namespace starboard { namespace { + // Creates a socket that is appropriate for binding and listening, but is not // bound and hasn't started listening yet. -std::unique_ptr CreateServerSocket(SbSocketAddressType address_type) { - std::unique_ptr socket(new Socket(address_type)); - if (!socket->IsValid()) { - SB_LOG(ERROR) << __FUNCTION__ << ": " - << "SbSocketCreate failed"; - return std::unique_ptr(); +int CreateServerSocket(SbSocketAddressType address_type) { + int socket_fd; + switch (address_type) { + case kSbSocketAddressTypeIpv4: + socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + break; + case kSbSocketAddressTypeIpv6: + socket_fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + break; + default: + SB_NOTREACHED(); + errno = EAFNOSUPPORT; + return -1; + } + + if (socket_fd < 0) { + return -1; } - if (!socket->SetReuseAddress(true)) { + // All Starboard sockets are non-blocking, so let's ensure it. + if (!posix::SetNonBlocking(socket_fd)) { + // Something went wrong, we'll clean up (preserving errno) and return + // failure. + HANDLE_EINTR(close(socket_fd)); + return errno; + } + +#if !defined(MSG_NOSIGNAL) && defined(SO_NOSIGPIPE) + // Use SO_NOSIGPIPE to mute SIGPIPE on darwin systems. + int optval_set = 1; + setsockopt(socket_fd, SOL_SOCKET, SO_NOSIGPIPE, + reinterpret_cast(&optval_set), sizeof(int)); +#endif + + if (socket_fd < 0) { SB_LOG(ERROR) << __FUNCTION__ << ": " - << "SbSocketSetReuseAddress failed"; - return std::unique_ptr(); + << "Socket create failed, errno: " << errno; + return -1; } - return socket; + int on = 1; + int result = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (result != 0) { + SB_LOG(ERROR) << "Failed to set SO_REUSEADDR on socket " << socket_fd + << ", errno = " << errno; + return -1; + } + + return socket_fd; } // Creates a server socket that is bound to the loopback interface. -std::unique_ptr CreateLocallyBoundSocket( - SbSocketAddressType address_type, - int port) { - std::unique_ptr socket = CreateServerSocket(address_type); - if (!socket) { - return std::unique_ptr(); +int CreateLocallyBoundSocket(SbSocketAddressType address_type, int port) { + int socket_fd = CreateServerSocket(address_type); + if (socket_fd < 0) { + return -1; } SbSocketAddress address = {}; bool success = GetLocalhostAddress(address_type, port, &address); if (!success) { SB_LOG(ERROR) << "GetLocalhostAddress failed"; - return std::unique_ptr(); + return -1; } - SbSocketError result = socket->Bind(&address); - if (result != kSbSocketOk) { - SB_LOG(ERROR) << __FUNCTION__ << ": " - << "SbSocketBind to " << port << " failed: " << result; - return std::unique_ptr(); + + posix::SockAddr sock_addr; + if (!sock_addr.FromSbSocketAddress(&address)) { + SB_LOG(ERROR) << __FUNCTION__ << ": Invalid address"; + return -1; } - return socket; + SB_DCHECK(socket_fd >= 0); + SbSocketAddress* local_address = &address; + // When binding to the IPV6 any address, ensure that the IPV6_V6ONLY flag is + // off to allow incoming IPV4 connections on the same socket. + // See https://www.ietf.org/rfc/rfc3493.txt for details. + if (local_address && (local_address->type == kSbSocketAddressTypeIpv6) && + common::MemoryIsZero(local_address->address, 16)) { + int on = 0; + if (setsockopt(socket_fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) != + 0) { + // Silently ignore errors, assume the default behavior is as expected. + errno = 0; + } + } + + int result = + HANDLE_EINTR(bind(socket_fd, sock_addr.sockaddr(), sock_addr.length)); + if (result != 0) { + SB_LOG(ERROR) << __FUNCTION__ << ": Bind failed. errno=" << errno; + return -1; + } + return socket_fd; } // Creates a server socket that is bound and listening to the loopback interface // on the given port. -std::unique_ptr CreateListeningSocket(SbSocketAddressType address_type, - int port) { - std::unique_ptr socket = CreateLocallyBoundSocket(address_type, port); - if (!socket) { - return std::unique_ptr(); +int CreateListeningSocket(SbSocketAddressType address_type, int port) { + int socket_fd = CreateLocallyBoundSocket(address_type, port); + if (socket_fd < 0) { + return -1; } - SbSocketError result = socket->Listen(); - if (result != kSbSocketOk) { +#if defined(SOMAXCONN) + const int kMaxConn = SOMAXCONN; +#else + const int kMaxConn = 128; +#endif + int result = listen(socket_fd, kMaxConn); + if (result != 0) { SB_LOG(ERROR) << __FUNCTION__ << ": " << "SbSocketListen failed: " << result; - return std::unique_ptr(); + return -1; } - return socket; + return socket_fd; } // Gets the port socket is bound to. -bool GetBoundPort(Socket* socket, int* out_port) { +bool GetBoundPort(int socket, int* out_port) { SB_DCHECK(out_port); SB_DCHECK(socket); SbSocketAddress socket_address = {0}; - bool result = socket->GetLocalAddress(&socket_address); - if (!result) { + posix::SockAddr sock_addr; + int result = getsockname(socket, sock_addr.sockaddr(), &sock_addr.length); + if (result < 0) { + return false; + } + if (!sock_addr.ToSbSocketAddress(&socket_address)) { return false; } @@ -157,8 +225,7 @@ class LinkReceiver::Impl { private: // Encapsulates connection state. struct Connection { - explicit Connection(std::unique_ptr socket) - : socket(std::move(socket)) {} + explicit Connection(int socket_fd) { socket = std::move(socket_fd); } ~Connection() {} void FlushLink(Application* application) { if (!data.empty()) { @@ -167,7 +234,7 @@ class LinkReceiver::Impl { } } - std::unique_ptr socket; + int socket; std::string data; }; @@ -177,7 +244,7 @@ class LinkReceiver::Impl { // Adds |socket| to the SbSocketWaiter to wait until ready for accepting a new // connection. - bool AddForAccept(Socket* socket); + bool AddForAccept(int socket); // Adds the |connection| to the SbSocketWaiter to wait until ready to read // more data. @@ -187,7 +254,7 @@ class LinkReceiver::Impl { void OnAcceptReady(); // Called when the waiter reports that a socket has more data to read. - void OnReadReady(SbSocket sb_socket); + void OnReadReady(int socket_fd); // Called when the waiter reports that a connection has more data to read. void OnReadReady(Connection* connection); @@ -197,11 +264,11 @@ class LinkReceiver::Impl { // SbSocketWaiter entry points. static void HandleAccept(SbSocketWaiter waiter, - SbSocket socket, + int socket, void* context, int ready_interests); static void HandleRead(SbSocketWaiter waiter, - SbSocket socket, + int socket, void* context, int ready_interests); @@ -233,10 +300,10 @@ class LinkReceiver::Impl { Semaphore destroy_waiter_; // The server socket listening for new connections. - std::unique_ptr listen_socket_; + int listen_socket_; // A map of raw SbSockets to Connection objects. - std::unordered_map connections_; + std::unordered_map connections_; }; LinkReceiver::Impl::Impl(Application* application, int port) @@ -267,14 +334,15 @@ void LinkReceiver::Impl::Run() { waiter_initialized_.Put(); return; } + listen_socket_ = -1; listen_socket_ = CreateListeningSocket(kSbSocketAddressTypeIpv4, specified_port_); - if (!listen_socket_ || !listen_socket_->IsValid()) { + if (listen_socket_ < 0) { listen_socket_ = CreateListeningSocket(kSbSocketAddressTypeIpv6, specified_port_); } - if (!listen_socket_ || !listen_socket_->IsValid()) { + if (listen_socket_ < 0) { SB_LOG(WARNING) << "Unable to start LinkReceiver on port " << specified_port_ << "."; SbSocketWaiterDestroy(waiter_); @@ -284,7 +352,7 @@ void LinkReceiver::Impl::Run() { } actual_port_ = 0; - bool result = GetBoundPort(listen_socket_.get(), &actual_port_); + bool result = GetBoundPort(listen_socket_, &actual_port_); if (!result) { SB_LOG(WARNING) << "Unable to get LinkReceiver bound port."; SbSocketWaiterDestroy(waiter_); @@ -297,7 +365,7 @@ void LinkReceiver::Impl::Run() { snprintf(port_string, SB_ARRAY_SIZE(port_string), "%d", actual_port_); CreateTemporaryFile("link_receiver_port", port_string, strlen(port_string)); - if (!AddForAccept(listen_socket_.get())) { + if (!AddForAccept(listen_socket_)) { quit_.store(true); } @@ -307,22 +375,22 @@ void LinkReceiver::Impl::Run() { } for (auto& entry : connections_) { - SbSocketWaiterRemove(waiter_, entry.first); + SbPosixSocketWaiterRemove(waiter_, entry.first); delete entry.second; } connections_.clear(); - SbSocketWaiterRemove(waiter_, listen_socket_->socket()); + SbPosixSocketWaiterRemove(waiter_, listen_socket_); // Block until destroying thread will no longer reference waiter. destroy_waiter_.Take(); SbSocketWaiterDestroy(waiter_); } -bool LinkReceiver::Impl::AddForAccept(Socket* socket) { - if (!SbSocketWaiterAdd(waiter_, socket->socket(), this, - &LinkReceiver::Impl::HandleAccept, - kSbSocketWaiterInterestRead, true)) { +bool LinkReceiver::Impl::AddForAccept(int socket) { + if (!SbPosixSocketWaiterAdd(waiter_, socket, this, + &LinkReceiver::Impl::HandleAccept, + kSbSocketWaiterInterestRead, true)) { SB_LOG(ERROR) << __FUNCTION__ << ": " << "SbSocketWaiterAdd failed."; return false; @@ -331,9 +399,9 @@ bool LinkReceiver::Impl::AddForAccept(Socket* socket) { } bool LinkReceiver::Impl::AddForRead(Connection* connection) { - if (!SbSocketWaiterAdd(waiter_, connection->socket->socket(), this, - &LinkReceiver::Impl::HandleRead, - kSbSocketWaiterInterestRead, false)) { + if (!SbPosixSocketWaiterAdd(waiter_, connection->socket, this, + &LinkReceiver::Impl::HandleRead, + kSbSocketWaiterInterestRead, false)) { SB_LOG(ERROR) << __FUNCTION__ << ": " << "SbSocketWaiterAdd failed."; return false; @@ -342,25 +410,47 @@ bool LinkReceiver::Impl::AddForRead(Connection* connection) { } void LinkReceiver::Impl::OnAcceptReady() { - std::unique_ptr accepted_socket = - std::unique_ptr(listen_socket_->Accept()); + SB_DCHECK(listen_socket_ >= 0); + + int accepted_socket = HANDLE_EINTR(accept(listen_socket_, NULL, NULL)); + if (accepted_socket < 0) { + return; + } + + // All Starboard sockets are non-blocking, so let's ensure it. + if (!posix::SetNonBlocking(accepted_socket)) { + // Something went wrong, we'll clean up and return failure. + HANDLE_EINTR(close(accepted_socket)); + return; + } + SB_DCHECK(accepted_socket); Connection* connection = new Connection(std::move(accepted_socket)); - connections_.emplace(connection->socket->socket(), connection); + connections_.emplace(connection->socket, connection); AddForRead(connection); } -void LinkReceiver::Impl::OnReadReady(SbSocket sb_socket) { - auto iter = connections_.find(sb_socket); +void LinkReceiver::Impl::OnReadReady(int socket_fd) { + auto iter = connections_.find(socket_fd); SB_DCHECK(iter != connections_.end()); OnReadReady(iter->second); } void LinkReceiver::Impl::OnReadReady(Connection* connection) { - auto socket = connection->socket.get(); + int socket = connection->socket; char data[64] = {0}; - int read = socket->ReceiveFrom(data, SB_ARRAY_SIZE_INT(data), NULL); + int read = -1; + + const int kRecvFlags = 0; + + SB_DCHECK(socket >= 0); + + ssize_t bytes_read = recv(socket, data, SB_ARRAY_SIZE_INT(data), kRecvFlags); + if (bytes_read >= 0) { + read = static_cast(bytes_read); + } + int last_null = 0; for (int position = 0; position < read; ++position) { if (data[position] == '\0' || data[position] == '\n' || @@ -383,7 +473,7 @@ void LinkReceiver::Impl::OnReadReady(Connection* connection) { if (read == 0) { // Terminate connection. connection->FlushLink(application_); - connections_.erase(socket->socket()); + connections_.erase(socket); delete connection; return; } @@ -401,7 +491,7 @@ void* LinkReceiver::Impl::RunThread(void* context) { // static void LinkReceiver::Impl::HandleAccept(SbSocketWaiter waiter, - SbSocket socket, + int socket, void* context, int ready_interests) { SB_DCHECK(context); @@ -410,7 +500,7 @@ void LinkReceiver::Impl::HandleAccept(SbSocketWaiter waiter, // static void LinkReceiver::Impl::HandleRead(SbSocketWaiter waiter, - SbSocket socket, + int socket, void* context, int ready_interests) { SB_DCHECK(context);