Skip to content

Commit

Permalink
Ensure correct mailbox configured. Ensure that full mailbox raises ex…
Browse files Browse the repository at this point in the history
…ception to prevent deadlock between send and deliver.

Signed-off-by: Brian Wehrle <[email protected]>
  • Loading branch information
bwehrle committed Jul 24, 2022
1 parent be1cf96 commit 26f1330
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ build/
*.ipr
.gradle/
out/
.java-version
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ public void resume(final String name) {

@Override
public void send(final Message message) {
for (int tries = 0; tries < totalSendRetries; ++tries) {
// This code causes a deadlock when (1) the queue is full and (2) the actor tries to send a message to itself.
// To avoid this, any write to full queue needs to raise an exception.
for (int tries = 0; tries < totalSendRetries; tries++) {
if (queue.offer(message)) {
if (notifyOnSend) {
dispatcher.execute(this);
}
return;
}
while (pendingMessages() >= queue.capacity()) ;
}
throw new IllegalStateException("Count not enqueue message due to busy mailbox.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@

package io.vlingo.xoom.actors.plugin.mailbox.agronampscarrayqueue;

import static org.junit.Assert.assertEquals;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.ActorsTest;
import io.vlingo.xoom.actors.Definition;
import io.vlingo.xoom.actors.plugin.PluginProperties;
import io.vlingo.xoom.actors.plugin.completes.PooledCompletesPlugin;
import io.vlingo.xoom.actors.testkit.AccessSafely;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

public class ManyToOneConcurrentArrayQueueMailboxActorTest extends ActorsTest {
private static final int MailboxSize = 64;
Expand All @@ -47,20 +47,34 @@ public void testBasicDispatch() {
@Test
public void testOverflowDispatch() {
final TestResults testResults = new TestResults(MaxCount);

final CountTaker countTaker =
world.actorFor(
CountTaker.class,
Definition.has(CountTakerActor.class, Definition.parameters(testResults), "testArrayQueueMailbox", "countTaker-2"));

final int totalCount = MailboxSize * 2;

for (int count = 1; count <= totalCount; ++count) {
countTaker.take(count);
}
Definition.has(CountTakerActor.class,
Definition.parameters(testResults),
"testArrayQueueMailbox",
"countTaker-2"));

assertThrows(IllegalStateException.class, () -> {
for (int count = 1; count <= MailboxSize + 1; ++count) {
countTaker.take(count);
}
});
}

@Test
public void testMailboxIsConfigured() {
final TestResults testResults = new TestResults(MaxCount);
CountTaker countTaker =
world.actorFor(
CountTaker.class,
Definition.has(CountTakerActor.class,
Definition.parameters(testResults),
"testArrayQueueMailbox",
"countTaker"));

assertEquals(MaxCount, testResults.getHighest());
String setMailboxTypeName = world.stage().mailboxTypeNameOf(countTaker);
assertEquals("ManyToOneConcurrentArrayQueueMailbox", setMailboxTypeName);
}

@Before
Expand All @@ -77,15 +91,19 @@ public void setUp() throws Exception {
properties.setProperty("plugin.testArrayQueueMailbox.dispatcherThrottlingCount", "1");
properties.setProperty("plugin.testArrayQueueMailbox.sendRetires", "10");

ManyToOneConcurrentArrayQueuePlugin provider = new ManyToOneConcurrentArrayQueuePlugin();

ManyToOneConcurrentArrayQueuePlugin manyToOneConcurrentArrayQueuePlugin = new ManyToOneConcurrentArrayQueuePlugin();
final PluginProperties pluginProperties = new PluginProperties("testArrayQueueMailbox", properties);
final PooledCompletesPlugin plugin = new PooledCompletesPlugin();
plugin.configuration().buildWith(world.configuration(), pluginProperties);
final PooledCompletesPlugin pooledCompletesPlugin = new PooledCompletesPlugin();

pooledCompletesPlugin.configuration().buildWith(world.configuration(), pluginProperties);
pooledCompletesPlugin.start(world);

provider.start(world);
manyToOneConcurrentArrayQueuePlugin.configuration().buildWith(world.configuration(), pluginProperties);
manyToOneConcurrentArrayQueuePlugin.start(world);
}

public static interface CountTaker {
public interface CountTaker {
void take(final int count);
}

Expand Down

0 comments on commit 26f1330

Please sign in to comment.