Skip to content

Commit

Permalink
[Java] Introduce messageDelayNs property to pace sending of the mes…
Browse files Browse the repository at this point in the history
…sages. Defaults to 1us.
  • Loading branch information
vyazelenko committed Feb 8, 2024
1 parent 4a7fc1c commit 1553b9e
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ public FailoverTestRig(

private Configuration validate(final Configuration configuration)
{
if (configuration.batchSize() != 1)
{
throw new IllegalArgumentException("batchSize must be 1, but was " + configuration.batchSize());
}
if (configuration.messageLength() != ECHO_MESSAGE_LENGTH)
{
throw new IllegalArgumentException("messageLength must be " + ECHO_MESSAGE_LENGTH + ", but was " +
Expand All @@ -134,23 +130,19 @@ public void run() throws Exception

if (configuration.warmupIterations() > 0)
{
out.printf("%nRunning warmup for %,d iterations of %,d messages each, with %,d bytes payload and a" +
" burst size of %,d...%n",
out.printf("%nRunning warmup for %,d iterations of %,d messages each, with %,d bytes payload...%n",
configuration.warmupIterations(),
configuration.warmupMessageRate(),
configuration.messageLength(),
configuration.batchSize());
configuration.messageLength());
runTest(configuration.warmupIterations(), configuration.warmupMessageRate());

persistedHistogram.reset();
}

out.printf("%nRunning measurement for %,d iterations of %,d messages each, with %,d bytes payload and a" +
" burst size of %,d...%n",
out.printf("%nRunning measurement for %,d iterations of %,d messages each, with %,d bytes payload...%n",
configuration.iterations(),
configuration.messageRate(),
configuration.messageLength(),
configuration.batchSize());
configuration.messageLength());
failoverAt = clock.nanoTime() + TimeUnit.SECONDS.toNanos(1);
runTest(configuration.iterations(), configuration.messageRate());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ void smallMessage(final @TempDir Path tempDir) throws Exception
{
setProperty(SOURCE_CHANNELS_PROP_NAME, "aeron:udp?endpoint=localhost:13334|mtu=2k|term-length=64k");
setProperty(DESTINATION_CHANNELS_PROP_NAME, "aeron:udp?endpoint=localhost:13333|mtu=2k|term-length=64k");
test(10_000, 111, 10, tempDir);
test(10_000, 111, 5555, tempDir);
}

@Timeout(30)
@Test
void mediumMessage(final @TempDir Path tempDir) throws Exception
{
test(1000, 288, 5, tempDir);
test(1000, 288, 1234, tempDir);
}

@Timeout(30)
Expand All @@ -95,7 +95,7 @@ void largeMessage(final @TempDir Path tempDir) throws Exception
protected final void test(
final int messageRate,
final int messageLength,
final int burstSize,
final long messageDelayNs,
final Path tempDir) throws Exception
{
final Configuration configuration = new Configuration.Builder()
Expand All @@ -104,7 +104,7 @@ protected final void test(
.messageRate(messageRate)
.messageLength(messageLength)
.messageTransceiverClass(messageTransceiverClass())
.batchSize(burstSize)
.messageSendDelayNs(messageDelayNs)
.outputDirectory(tempDir)
.outputFileNamePrefix("aeron")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,27 @@ void after()
@Test
void messageLength32bytes(final @TempDir Path tempDir) throws Exception
{
test(10_000, 32, 10, tempDir);
test(10_000, 32, tempDir);
}

@Timeout(30)
@Test
void messageLength192bytes(final @TempDir Path tempDir) throws Exception
{
test(1000, 192, 5, tempDir);
test(1000, 192, tempDir);
}

@Timeout(30)
@Test
void messageLength1344bytes(final @TempDir Path tempDir) throws Exception
{
test(100, 1344, 1, tempDir);
test(100, 1344, tempDir);
}

@SuppressWarnings("MethodLength")
protected final void test(
final int messages,
final int messageLength,
final int burstSize,
final Path tempDir) throws Exception
{
final String aeronDirectoryName = tempDir.resolve("driver").toString();
Expand All @@ -113,7 +112,6 @@ protected final void test(
.messageRate(messages)
.messageLength(messageLength)
.messageTransceiverClass(ClusterMessageTransceiver.class)
.batchSize(burstSize)
.outputDirectory(tempDir)
.outputFileNamePrefix("aeron")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ void test() throws Exception
.messageRate(1000)
.messageLength(16)
.messageTransceiverClass(DummyMessageTransceiver.class)
.batchSize(1)
.outputDirectory(tempDir)
.outputFileNamePrefix("failover")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.HdrHistogram.ValueRecorder;
import org.agrona.AsciiEncoding;
import org.agrona.AsciiNumberFormatException;
import org.agrona.SystemUtil;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.NoOpIdleStrategy;
Expand All @@ -42,6 +43,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.*;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.agrona.BitUtil.SIZE_OF_LONG;
import static org.agrona.BitUtil.toHex;
import static org.agrona.Strings.isEmpty;
Expand Down Expand Up @@ -76,9 +79,9 @@ public final class Configuration
public static final int DEFAULT_ITERATIONS = 10;

/**
* Default number of messages in a single batch.
* Default message delay in nanoseconds.
*/
public static final int DEFAULT_BATCH_SIZE = 1;
public static final long DEFAULT_MESSAGE_DELAY_NS = MICROSECONDS.toNanos(1);

/**
* Default tracking of latency history
Expand Down Expand Up @@ -124,19 +127,19 @@ public final class Configuration
public static final String WARMUP_MESSAGE_RATE_PROP_NAME = "uk.co.real_logic.benchmarks.remote.warmup.message.rate";

/**
* Name of the system property to configure the batch size, i.e. number of messages to be sent in a single burst.
* Default value is {@link #DEFAULT_BATCH_SIZE}.
* Name of the system property to configure the message size in bytes. Default value is {@link #MIN_MESSAGE_LENGTH}.
*
* @see #batchSize()
* @see #messageLength()
*/
public static final String BATCH_SIZE_PROP_NAME = "uk.co.real_logic.benchmarks.remote.batch.size";
public static final String MESSAGE_LENGTH_PROP_NAME = "uk.co.real_logic.benchmarks.remote.message.length";

/**
* Name of the system property to configure the message size in bytes. Default value is {@link #MIN_MESSAGE_LENGTH}.
* Name of the system property to configure the minimal guaranteed message sending delay. Default value is
* {@link #DEFAULT_MESSAGE_DELAY_NS}. The actual send delay is calculated based on the message rate.
*
* @see #messageLength()
* @see #messageSendDelayNs()
*/
public static final String MESSAGE_LENGTH_PROP_NAME = "uk.co.real_logic.benchmarks.remote.message.length";
public static final String MESSAGE_SEND_DELAY_PROP_NAME = "uk.co.real_logic.benchmarks.remote.message.send.delay";

/**
* Name of the system property to configure the {@link IdleStrategy} to use when sending and receiving messages.
Expand Down Expand Up @@ -194,7 +197,6 @@ public final class Configuration
private final int iterations;
private final int warmupMessageRate;
private final int messageRate;
private final int batchSize;
private final int messageLength;
private final Class<? extends MessageTransceiver> messageTransceiverClass;
private final IdleStrategy idleStrategy;
Expand All @@ -203,18 +205,20 @@ public final class Configuration
private final String rate;
private final String outputFileNamePrefix;
private final boolean trackHistory;
private final long messageSendDelayNs;

private Configuration(final Builder builder)
{
this.warmupIterations = checkValueRange(
this.warmupIterations = (int)checkValueRange(
builder.warmupIterations, 0, Integer.MAX_VALUE, WARMUP_ITERATIONS_PROP_NAME);
this.iterations = checkValueRange(builder.iterations, 1, Integer.MAX_VALUE, ITERATIONS_PROP_NAME);
this.warmupMessageRate = checkValueRange(
this.iterations = (int)checkValueRange(builder.iterations, 1, Integer.MAX_VALUE, ITERATIONS_PROP_NAME);
this.warmupMessageRate = (int)checkValueRange(
builder.warmupMessageRate, 0, MAX_MESSAGE_RATE, WARMUP_MESSAGE_RATE_PROP_NAME);
this.messageRate = checkValueRange(builder.messageRate, 1, MAX_MESSAGE_RATE, MESSAGE_RATE_PROP_NAME);
this.batchSize = checkValueRange(builder.batchSize, 1, Integer.MAX_VALUE, BATCH_SIZE_PROP_NAME);
this.messageLength =
checkValueRange(builder.messageLength, MIN_MESSAGE_LENGTH, Integer.MAX_VALUE, MESSAGE_LENGTH_PROP_NAME);
this.messageRate = (int)checkValueRange(builder.messageRate, 1, MAX_MESSAGE_RATE, MESSAGE_RATE_PROP_NAME);
this.messageLength = (int)checkValueRange(
builder.messageLength, MIN_MESSAGE_LENGTH, Integer.MAX_VALUE, MESSAGE_LENGTH_PROP_NAME);
messageSendDelayNs = checkValueRange(
builder.messageSendDelayNs, 1, SECONDS.toNanos(1), MESSAGE_SEND_DELAY_PROP_NAME);
this.messageTransceiverClass = validateMessageTransceiverClass(builder.messageTransceiverClass);
this.idleStrategy = requireNonNull(builder.idleStrategy, "'" + IDLE_STRATEGY_PROP_NAME + "' cannot be null");
this.outputDirectory = validateOutputDirectory(builder.outputDirectory);
Expand Down Expand Up @@ -270,28 +274,23 @@ public int messageRate()
}

/**
* Size of the batch, i.e. number of messages to be sent in a single burst.
* <p>
* For example if the number of messages is {@code 1000} and the batch size is {code 1} then a single message will
* be sent every millisecond. However if the batch size is {@code 5} then a batch of five messages will be sent
* every five milliseconds.
* </p>
* Length in bytes of a single message.
*
* @return number of messages to be sent in a single burst, defaults to {@link #DEFAULT_BATCH_SIZE}.
* @return length in bytes of a single message, defaults to {@link #MIN_MESSAGE_LENGTH}.
*/
public int batchSize()
public int messageLength()
{
return batchSize;
return messageLength;
}

/**
* Length in bytes of a single message.
* Minimal delay in nanoseconds before sending the next message.
*
* @return length in bytes of a single message, defaults to {@link #MIN_MESSAGE_LENGTH}.
* @return delay in nanoseconds.
*/
public int messageLength()
public long messageSendDelayNs()
{
return messageLength;
return messageSendDelayNs;
}

/**
Expand Down Expand Up @@ -361,8 +360,8 @@ public String toString()
"\n warmupMessageRate=" + warmupMessageRate +
"\n iterations=" + iterations +
"\n messageRate=" + rate +
"\n batchSize=" + batchSize +
"\n messageLength=" + messageLength +
"\n messageSendDelayNs=" + messageSendDelayNs +
"\n messageTransceiverClass=" + messageTransceiverClass.getName() +
"\n idleStrategy=" + idleStrategy +
"\n outputDirectory=" + outputDirectory +
Expand Down Expand Up @@ -396,7 +395,6 @@ private String computeFileNamePrefix(final String outputFileNamePrefix, final Pr

return prefix +
"_rate=" + rate +
"_batch=" + batchSize +
"_length=" + messageLength +
"_sha=" + computeSha256(systemProperties);
}
Expand All @@ -410,14 +408,14 @@ public static final class Builder
private int iterations = DEFAULT_ITERATIONS;
private int warmupMessageRate = DEFAULT_WARMUP_MESSAGE_RATE;
private int messageRate;
private int batchSize = DEFAULT_BATCH_SIZE;
private int messageLength = MIN_MESSAGE_LENGTH;
private Class<? extends MessageTransceiver> messageTransceiverClass;
private IdleStrategy idleStrategy = NoOpIdleStrategy.INSTANCE;
private Path outputDirectory = Paths.get("results");
private Properties systemProperties = System.getProperties();
private String outputFileNamePrefix;
private boolean trackHistory = DEFAULT_TRACK_HISTORY;
private long messageSendDelayNs = DEFAULT_MESSAGE_DELAY_NS;

/**
* Set the number of warmup iterations.
Expand Down Expand Up @@ -467,18 +465,6 @@ public Builder messageRate(final int messageRate)
return this;
}

/**
* Set the batch size, i.e. number of messages to be sent at once in a single burst.
*
* @param size of a single batch of messages.
* @return this for a fluent API.
*/
public Builder batchSize(final int size)
{
this.batchSize = size;
return this;
}

/**
* Set the length of a single message in bytes. Must be at least {@link #MIN_MESSAGE_LENGTH} bytes long, since
* every message must contain a {@code timestamp} payload.
Expand Down Expand Up @@ -552,6 +538,18 @@ public Builder trackHistory(final boolean trackHistory)
return this;
}

/**
* Set the minimal message sending delay to control send batching.
*
* @param messageSendDelayNs delay in nanos.
* @return this for a fluent API.
*/
public Builder messageSendDelayNs(final long messageSendDelayNs)
{
this.messageSendDelayNs = messageSendDelayNs;
return this;
}

/**
* Create a new instance of the {@link Configuration} class from this builder.
*
Expand Down Expand Up @@ -592,14 +590,15 @@ public static Configuration fromSystemProperties()
builder.iterations(intProperty(ITERATIONS_PROP_NAME));
}

if (isPropertyProvided(BATCH_SIZE_PROP_NAME))
if (isPropertyProvided(MESSAGE_LENGTH_PROP_NAME))
{
builder.batchSize(intProperty(BATCH_SIZE_PROP_NAME));
builder.messageLength(intProperty(MESSAGE_LENGTH_PROP_NAME));
}

if (isPropertyProvided(MESSAGE_LENGTH_PROP_NAME))
if (isPropertyProvided(MESSAGE_SEND_DELAY_PROP_NAME))
{
builder.messageLength(intProperty(MESSAGE_LENGTH_PROP_NAME));
builder.messageSendDelayNs(SystemUtil.parseDuration(
MESSAGE_SEND_DELAY_PROP_NAME, System.getProperty(MESSAGE_SEND_DELAY_PROP_NAME)));
}

if (isPropertyProvided(IDLE_STRATEGY_PROP_NAME))
Expand Down Expand Up @@ -672,7 +671,8 @@ public static Path resolveLogsDir()
return directory;
}

private static int checkValueRange(final int value, final int minValue, final int maxValue, final String propName)
private static long checkValueRange(
final long value, final long minValue, final long maxValue, final String propName)
{
if (value < minValue)
{
Expand Down
Loading

0 comments on commit 1553b9e

Please sign in to comment.