Skip to content

Commit

Permalink
Speed up LocalIncomingServerSessionTest
Browse files Browse the repository at this point in the history
The test teardown waits for inbound stream IDs to be registered with the session manager. Many streams (those that have errored out or have been replaced with an encrypted stream) won't ever be registered in the Session Manager. The test teardown doesn't need to wait for those.

This commit introduces a change that tracks the streams that are still in-flight. Test teardown will now only wait for those.
  • Loading branch information
guusdk committed Jan 4, 2025
1 parent d16bdf1 commit 2aa58ea
Showing 1 changed file with 40 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;

public class RemoteInitiatingServerDummy extends AbstractRemoteServerDummy
Expand All @@ -53,6 +50,7 @@ public class RemoteInitiatingServerDummy extends AbstractRemoteServerDummy
boolean peerSupportsDialback;
private ExecutorService processingService;
private final List<StreamID> receivedStreamIDs = new ArrayList<>();
private final List<StreamID> processedStreamIDs = new ArrayList<>();
private final List<String> receivedStreamFromValues = new ArrayList<>();
private final List<String> receivedStreamToValues = new ArrayList<>();

Expand Down Expand Up @@ -101,24 +99,24 @@ public void blockUntilDone(final long timeout, final TimeUnit unit) {
protected void done()
{
log("Start being done");
if (!getReceivedStreamIDs().isEmpty()) {
if (!getNonProcessedStreamIDs().isEmpty()) {
// If we recorded a stream ID, wait for this stream to be registered in the session manager before
// continuing to prevent a race condition.
log("Wait for stream to be registered in the session manager");
final StreamID lastReceivedID = getNonProcessedStreamIDs().get(getNonProcessedStreamIDs().size()-1);
log("Wait for stream to be registered in the session manager: " + lastReceivedID);
final Instant stopWaiting = Instant.now().plus(500, ChronoUnit.MILLIS);
try {
final StreamID lastReceivedID = getReceivedStreamIDs().get(getReceivedStreamIDs().size()-1);
final SessionManager sessionManager = XMPPServer.getInstance().getSessionManager();
boolean found = false;
while (Instant.now().isBefore(stopWaiting)) {
if (sessionManager.getIncomingServerSession( lastReceivedID ) != null) {
log("Found stream registered in the session manager");
log("Found stream registered in the session manager: " + lastReceivedID);
found = true;
break;
}
Thread.sleep(10);
}
if (!found) log("NEVER FOUND STREAM WE WERE (pointlessly?) WAITING FOR!");
if (!found) log("NEVER FOUND STREAM WE WERE (pointlessly?) WAITING FOR: " + lastReceivedID);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -138,6 +136,7 @@ public void disconnect() throws InterruptedException, IOException
dialbackAuthoritativeServer.close();
dialbackAuthoritativeServer = null;
}
log("disconnected");
}

public synchronized void stopProcessingService() throws InterruptedException
Expand Down Expand Up @@ -189,6 +188,28 @@ public List<StreamID> getReceivedStreamIDs()
return receivedStreamIDs;
}

/**
* Returns all stream IDs that have been received, but have not yet been marked as being processed.
*
* @return Stream IDs still being processed.
*/
public List<StreamID> getNonProcessedStreamIDs()
{
final List<StreamID> result = new ArrayList<>(receivedStreamIDs);
result.removeAll(processedStreamIDs);
return result;
}

/**
* Mark the last received stream ID as being fully processed. This prevents the teardown from waiting for this stream
* to be established.
*/
public void markLastStreamIDasProcessed() {
final StreamID streamID = receivedStreamIDs.get(receivedStreamIDs.size() - 1);
processedStreamIDs.add(streamID);
log("Marked as processed: " + streamID);
}

/**
* Returns all stream 'from' attribute values (potential duplicates, but no null values) that were received from the
* peer during the setup.
Expand Down Expand Up @@ -451,14 +472,14 @@ public void run()
}
}
} while (!processingService.isShutdown() && allowableSocketTimeouts > 0);
log("Ending read loop.");
log("Ending read loop" + (socket instanceof SSLSocket ? " (encrypted)" : ""));
} catch (Throwable t) {
// Log exception only when not cleanly closed.
if (doLog && !processingService.isShutdown()) {
t.printStackTrace();
}
} finally {
log("Stopped reading from socket");
log("Stopped reading from socket" + (socket instanceof SSLSocket ? " (encrypted)" : ""));
done();
}
}
Expand Down Expand Up @@ -512,6 +533,8 @@ private boolean negotiateEncryption(final Element features) throws IOException
final Element error = root.addElement(QName.get("error", "stream", "http://etherx.jabber.org/streams"));
error.addElement(QName.get("undefined-condition", "urn:ietf:params:xml:ns:xmpp-streams"));

markLastStreamIDasProcessed(); // Prevents the code from waiting on this stream to be registered with session manager during test fixture teardown.

send(root.asXML().substring(root.asXML().indexOf(">")+1));
throw new InterruptedIOException("Openfire requires TLS, we disabled it.");
}
Expand All @@ -531,6 +554,8 @@ private boolean negotiateEncryption(final Element features) throws IOException
final Element error = root.addElement(QName.get("error", "stream", "http://etherx.jabber.org/streams"));
error.addElement(QName.get("undefined-condition", "urn:ietf:params:xml:ns:xmpp-streams"));

markLastStreamIDasProcessed(); // Prevents the code from waiting on this stream to be registered with session manager during test fixture teardown.

send(root.asXML().substring(root.asXML().indexOf(">")+1));
throw new InterruptedIOException("Openfire disabled TLS, we require it.");
}
Expand All @@ -548,6 +573,9 @@ private void initiateTLS() throws IOException {
log("Initiating TLS...");
final Document outbound = DocumentHelper.createDocument();
final Element startTls = outbound.addElement(QName.get("starttls", "urn:ietf:params:xml:ns:xmpp-tls"));

markLastStreamIDasProcessed(); // Prevents the code from waiting on the stream (to be replaced with an encrypted one) to be registered with session manager during test fixture teardown.

send(startTls.asXML());
}

Expand Down Expand Up @@ -600,6 +628,7 @@ private void negotiateAuthentication(final Element features) throws IOException
startDialbackAuth();
} else {
log("Unable to do authentication.");
markLastStreamIDasProcessed(); // Prevents the code from waiting on this stream to be registered with session manager during test fixture teardown.
throw new InterruptedIOException("Unable to do authentication.");
}
}
Expand Down

0 comments on commit 2aa58ea

Please sign in to comment.