Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a socket pool of multiple sockets bound to the same port #293

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions src/main/java/org/ice4j/ice/harvest/AbstractUdpListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.ice4j.*;
import org.ice4j.attribute.*;
import org.ice4j.ice.*;
import org.ice4j.message.*;
import org.ice4j.socket.*;
import org.ice4j.util.*;
Expand All @@ -37,7 +36,7 @@
import static org.ice4j.ice.harvest.HarvestConfig.config;

/**
* A class which holds a {@link DatagramSocket} and runs a thread
* A class which holds a {@link SocketPool} and runs a thread
* ({@link #thread}) which perpetually reads from it.
*
* When a datagram from an unknown source is received, it is parsed as a STUN
Expand Down Expand Up @@ -196,13 +195,18 @@ static String getUfrag(byte[] buf, int off, int len)
*/
protected final TransportAddress localAddress;

/**
* The pool of sockets available for writing.
*/
private final SocketPool socketPool;

/**
* The "main" socket that this harvester reads from.
*/
private final DatagramSocket socket;
private final DatagramSocket receiveSocket;

/**
* The thread reading from {@link #socket}.
* The thread reading from {@link #receiveSocket}.
*/
private final Thread thread;

Expand Down Expand Up @@ -236,28 +240,30 @@ protected AbstractUdpListener(TransportAddress localAddress)
);
}

socket = new DatagramSocket( tempAddress );
socketPool = new SocketPool( tempAddress, 0 );

receiveSocket = socketPool.getReceiveSocket();

Integer receiveBufferSize = config.udpReceiveBufferSize();
if (receiveBufferSize != null)
{
socket.setReceiveBufferSize(receiveBufferSize);
receiveSocket.setReceiveBufferSize(receiveBufferSize);
}

/* Update the port number if needed. */
if (localAddress.getPort() == 0)
{
tempAddress = new TransportAddress(
tempAddress.getAddress(),
socket.getLocalPort(),
receiveSocket.getLocalPort(),
tempAddress.getTransport()
);
}
this.localAddress = tempAddress;

String logMessage
= "Initialized AbstractUdpListener with address " + this.localAddress;
logMessage += ". Receive buffer size " + socket.getReceiveBufferSize();
logMessage += ". Receive buffer size " + receiveSocket.getReceiveBufferSize();
if (receiveBufferSize != null)
{
logMessage += " (asked for " + receiveBufferSize + ")";
Expand Down Expand Up @@ -292,11 +298,11 @@ public TransportAddress getLocalAddress()
public void close()
{
close = true;
socket.close(); // causes socket#receive to stop blocking.
socketPool.close(); // causes socket#receive to stop blocking.
}

/**
* Perpetually reads datagrams from {@link #socket} and handles them
* Perpetually reads datagrams from {@link #receiveSocket} and handles them
* accordingly.
*
* It is important that this blocks are little as possible (except on
Expand Down Expand Up @@ -326,7 +332,7 @@ private void runInHarvesterThread()

try
{
socket.receive(pkt);
receiveSocket.receive(pkt);
}
catch (IOException ioe)
{
Expand Down Expand Up @@ -376,13 +382,13 @@ private void runInHarvesterThread()
{
candidateSocket.close();
}
socket.close();
socketPool.close();
}

/**
* Read packets from the socket and forward them via the push API. Note that the memory model here is different
* than the other case. Specifically, we:
* 1. Receive from {@link #socket} into a fixed buffer
* 1. Receive from {@link #receiveSocket} into a fixed buffer
* 2. Obtain a buffer of the required size using {@link BufferPool#getBuffer}
* 3. Copy the data into the buffer and either
* 3.1 Call the associated {@link BufferHandler} if the packet is payload
Expand Down Expand Up @@ -410,7 +416,7 @@ private void runInHarvesterThreadPush()

try
{
socket.receive(pkt);
receiveSocket.receive(pkt);
receivedTime = clock.instant();
}
catch (IOException ioe)
Expand Down Expand Up @@ -467,7 +473,7 @@ private void runInHarvesterThreadPush()
{
candidateSocket.close();
}
socket.close();
socketPool.close();
}

private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
Expand All @@ -478,7 +484,7 @@ private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
System.arraycopy(p.getData(), p.getOffset(), buffer.getBuffer(), off, p.getLength());
buffer.setOffset(off);
buffer.setLength(p.getLength());
buffer.setLocalAddress(socket.getLocalSocketAddress());
buffer.setLocalAddress(receiveSocket.getLocalSocketAddress());
buffer.setRemoteAddress(p.getSocketAddress());
buffer.setReceivedTime(receivedTime);

Expand Down Expand Up @@ -808,14 +814,14 @@ public void receive(DatagramPacket p)
/**
* {@inheritDoc}
*
* Delegates to the actual socket of the harvester.
* Delegates to the socket pool.
*/
@Override
public void send(DatagramPacket p)
throws IOException
{
p.setSocketAddress(remoteAddress);
socket.send(p);
socketPool.send(p);
}
}
}
114 changes: 114 additions & 0 deletions src/main/kotlin/org/ice4j/socket/SocketPool.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright @ 2020 - Present, 8x8 Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ice4j.socket

import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.DatagramSocketImpl
import java.net.SocketAddress
import java.nio.channels.DatagramChannel

/** A pool of datagram sockets all bound on the same port.
*
* This is necessary to allow multiple threads to send packets simultaneously from the same source address,
* in JDK 15 and later, because the [DatagramChannel]-based implementation of [DatagramSocketImpl] introduced
* in that version locks the socket during a call to [DatagramSocket.send].
*
* (The old [DatagramSocketImpl] implementation can be used by setting the system property
* `jdk.net.usePlainDatagramSocketImpl` in JDK versions 15 through 17, but was removed in versions 18 and later.)
*
* This feature may also be useful on older JDK versions on non-Linux operating systems, such as macOS,
* which block simultaneous writes through the same UDP socket at the operating system level.
*
* The sockets are opened such that packets will be _received_ on exactly one socket.
*/
class SocketPool(
/** The address to which to bind the pool of sockets. */
address: SocketAddress,
/** The number of sockets to create for the pool. If this is set to zero (the default), the number
* will be set automatically to an appropriate value.
*/
requestedNumSockets: Int = 0
) {
init {
require(requestedNumSockets >= 0) { "RequestedNumSockets must be >= 0" }
}

internal class SocketAndIndex(
val socket: DatagramSocket,
var count: Int = 0
)

val numSockets: Int =
if (requestedNumSockets != 0) {
requestedNumSockets
} else {
// TODO: set this to 1 in situations where pools aren't needed?
2 * Runtime.getRuntime().availableProcessors()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest logging the number we end up using.

Why does it default to twice the number of processors? Running your test on an A2 with 8 CPUs I see optimal performance for 8 sockets.

}

private val sockets = buildList {
val multipleSockets = numSockets > 1
var bindAddr = address
for (i in 0 until numSockets) {
val sock = DatagramSocket(null)
if (multipleSockets) {
sock.reuseAddress = true
}
sock.bind(bindAddr)
if (i == 0 && multipleSockets) {
bindAddr = sock.localSocketAddress
}
add(SocketAndIndex(sock, 0))
}
}

/** The socket on which packets will be received. */
val receiveSocket: DatagramSocket
// On all platforms I've tested, the last-bound socket is the one which receives packets.
// TODO: should we support Linux's flavor of SO_REUSEPORT, in which packets can be received on *all* the
// sockets, spreading load?
get() = sockets.last().socket

fun send(packet: DatagramPacket) {
val sendSocket = getSendSocket()
sendSocket.socket.send(packet)
returnSocket(sendSocket)
}

/** Gets a socket on which packets can be sent, chosen from among all the available send sockets. */
internal fun getSendSocket(): SocketAndIndex {
if (numSockets == 1) {
return sockets.first()
}
synchronized(sockets) {
val min = sockets.minBy { it.count }
min.count++

return min
}
}

internal fun returnSocket(socket: SocketAndIndex) {
synchronized(sockets) {
socket.count--
}
}

fun close() {
sockets.forEach { it.socket.close() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,6 @@
*/
public class SinglePortUdpHarvesterTest
{
/**
* Verifies that, without closing, the address used by a harvester cannot be re-used.
*
* @see <a href="https://github.com/jitsi/ice4j/issues/139">https://github.com/jitsi/ice4j/issues/139</a>
*/
@Test
public void testRebindWithoutCloseThrows() throws Exception
{
// Setup test fixture.
final TransportAddress address = new TransportAddress( "127.0.0.1", 10000, Transport.UDP );
SinglePortUdpHarvester firstHarvester;
try
{
firstHarvester = new SinglePortUdpHarvester( address );
}
catch (BindException ex)
{
// This is not expected at this stage (the port is likely already in use by another process, voiding this
// test). Rethrow as a different exception than the BindException, that is expected to be thrown later in
// this test.
throw new Exception( "Test fixture is invalid.", ex );
}

// Execute system under test.
SinglePortUdpHarvester secondHarvester = null;
try
{
secondHarvester = new SinglePortUdpHarvester( address );
fail("expected BindException to be thrown at this point");
}
catch (BindException ex)
{
//expected, do nothing
}
finally
{
// Tear down
firstHarvester.close();
if (secondHarvester != null)
{
secondHarvester.close();
}
}
}

/**
* Verifies that, after closing, the address used by a harvester can be re-used.
*
Expand Down
Loading
Loading