Skip to content

Commit

Permalink
Push API for payload (#285)
Browse files Browse the repository at this point in the history
* feat: Add and use a generic Buffer class.

* fix: Improve isStunPacket check.
Return early when the first two bits don't match. This is for:
1. Correctness: A packet that doesn't start with 00 is not STUN even if
   the magic cookie happens to match
2. Performance: Most packets are RTP and will fail the first byte check,
   so no need to check the magic cookie.

* feat: Add a push API for payload
When enabled packets are passed directly to the application,

* feat: Allow Component.send to send over a valid (but not selected) pair.

---------

Co-authored-by: Jonathan Lennox <[email protected]>
  • Loading branch information
bgrozev and JonathanLennox authored Oct 2, 2024
1 parent 4a3574e commit a982207
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 106 deletions.
30 changes: 30 additions & 0 deletions doc/push-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Purpose
The push API is a mode in which ice4j passes received payload packets to the application directly via a callback,
instead of making them available for reading from a virtual DatagramSocket. This avoids packets being copied into queues
handled by different threads, and is intended to reduce the workload and delay.

# Use
First, the API has to be enabled by setting `AbstractUdpListener.USE_PUSH_API` to true. This needs to be done before any
`SinglePortUdpHarvester`s are created.

A callback for payload packets needs to be configured for each `Component` using `setBufferCallback()`.

Finally, sending data should be done via `Component.send(byte[] data, int offset, int length)` instead of one of the
virtual sockets.

# Memory model
Buffers for each packet are allocated using `BufferPool.getBuffer`, which can be set externally. The default
implementation just allocates new memory on the java heap.

If a buffer is not passed to the application, it will be returned via `BufferPool.returnBuffer`. Otherwise, it is the
responsibility of the application.

Two new config options can be used to specify a non-zero offset and space to be left at the end of the buffers (which
could be used to e.g. make RTP processing more efficient):

```AbstractUdpListener.BYTES_TO_LEAVE_AT_START_OF_PACKET```
```AbstractUdpListener.BYTES_TO_LEAVE_AT_END_OF_PACKET```

# Limitations
The push API currently only supports `SinglePortUdpHarvester`. If the application uses regular `HostCandidate`s, it
has to read the packets from a `DatagramSocket`. Sending via `Component.send()` works either way.
88 changes: 84 additions & 4 deletions src/main/java/org/ice4j/ice/Component.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.ice4j.ice;

import java.beans.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;

import org.ice4j.*;
import org.ice4j.ice.harvest.*;
import org.ice4j.socket.*;
import org.ice4j.util.*;
import org.jetbrains.annotations.*;
import org.jitsi.utils.logging2.*;

/**
Expand All @@ -39,7 +43,7 @@
* @author Boris Grozev
*/
public class Component
implements PropertyChangeListener
implements PropertyChangeListener, BufferHandler
{
/**
* The component ID to use with RTP streams.
Expand Down Expand Up @@ -149,9 +153,12 @@ public class Component
/**
* The set of pairs which this component wants to keep alive.
*/
private final Set<CandidatePair> keepAlivePairs
= Collections.newSetFromMap(
new ConcurrentHashMap<CandidatePair, Boolean>());
private final Set<CandidatePair> keepAlivePairs = Collections.newSetFromMap(new ConcurrentHashMap<>());

/**
* External callback for the push API. Called with every packet received via {@link #handleBuffer(Buffer)}.
*/
private BufferHandler bufferCallback = null;

/**
* Creates a new <tt>Component</tt> with the specified <tt>componentID</tt>
Expand Down Expand Up @@ -1188,4 +1195,77 @@ public Logger getLogger()
{
return logger;
}

/**
* Send a packet to the remote side. Uses the last used candidate pair to find the right socket and remote address.
*/
public void send(byte[] buffer, int offset, int length)
throws IOException
{
CandidatePair pair = getSelectedPair();
if (pair == null)
{
logger.debug("No selected pair, will try valid for sending");
pair = parentStream.getValidPair(this);
if (pair == null)
{
throw new IOException("No valid pair.");
}
}

LocalCandidate localCandidate = pair.getLocalCandidate();
if (localCandidate != null && localCandidate.getBase() != null)
{
localCandidate = localCandidate.getBase();
}
SocketAddress remoteAddress = pair.getRemoteCandidate().getTransportAddress();
IceSocketWrapper socket
= localCandidate == null ? null : localCandidate.getCandidateIceSocketWrapper(remoteAddress);

if (socket == null)
{
throw new IOException("No socket found to send on.");
}

DatagramPacket p = new DatagramPacket(buffer, offset, length);
p.setSocketAddress(remoteAddress);
socket.send(p);
}

/**
* Handle a buffer that was received from one of the sockets and should be forwarded to the application.
*/
@Override
public void handleBuffer(@NotNull Buffer buffer)
{
BufferHandler bufferCallback = this.bufferCallback;

if (bufferCallback == null)
{
logger.warn(
"The push API is used while no buffer callback has been set, dropping a packet (use-push-api="
+ AbstractUdpListener.USE_PUSH_API + ").");
BufferPool.returnBuffer.invoke(buffer);
return;
}

try
{
bufferCallback.handleBuffer(buffer);
}
catch (Exception e)
{
logger.warn("Buffer handling failed", e);
BufferPool.returnBuffer.invoke(buffer);
}
}

/**
* Set the external callback to be used for the push API.
* @param bufferCallback the external callback
*/
public void setBufferCallback(BufferHandler bufferCallback)
{
this.bufferCallback = bufferCallback;
}
}
3 changes: 1 addition & 2 deletions src/main/java/org/ice4j/ice/ConnectivityCheckServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public void processRequest(StunMessageEvent evt)

if (respond)
{
response = MessageFactory.createBindingResponse(
request, evt.getRemoteAddress());
response = MessageFactory.createBindingResponse(request, evt.getRemoteAddress());
}
else
{
Expand Down
Loading

0 comments on commit a982207

Please sign in to comment.