Skip to content

Commit

Permalink
Add more tests for DataPipe to verify clearing/closing the pipe while…
Browse files Browse the repository at this point in the history
… writing
  • Loading branch information
centic9 committed Jul 16, 2024
1 parent 8d86b29 commit ebf09a5
Showing 1 changed file with 113 additions and 56 deletions.
169 changes: 113 additions & 56 deletions src/test/java/org/dstadler/audio/util/DataPipeTest.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
package org.dstadler.audio.util;


import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.rng.UniformRandomProvider;
import org.apache.commons.rng.simple.RandomSource;
import org.dstadler.commons.testing.TestHelpers;
import org.dstadler.commons.testing.ThreadTestHelper;
import org.junit.After;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.*;

@SuppressWarnings("BusyWait")
public class DataPipeTest {
@Test
private static final UniformRandomProvider RNG = RandomSource.XO_RO_SHI_RO_128_PP.create();

private final AtomicReference<IOException> exc = new AtomicReference<>();
private final AtomicReference<Long> lastWrite = new AtomicReference<>(System.currentTimeMillis());

@After
public void tearDown() throws IOException {
if (exc.get() != null) {
throw exc.get();
}
}

@Test
public void testDataPipeNotStarted() throws IOException, InterruptedException {
DataPipe pipe = new DataPipe();

Expand Down Expand Up @@ -54,64 +69,54 @@ public void testDataPipeCreated() throws IOException, InterruptedException {
private static final int NUMBER_OF_THREADS = 10;
private static final int NUMBER_OF_TESTS = 2000;

@Test
@Test
public void testMultipleThreads() throws Throwable {
DataPipe pipe = new DataPipe();

pipe.createPipe();
try (DataPipe pipe = new DataPipe()) {

ThreadTestHelper helper =
new ThreadTestHelper(NUMBER_OF_THREADS, NUMBER_OF_TESTS);

helper.executeTest((threadNum, itNum) -> {
int rnd = RandomUtils.nextInt(0, 6);

switch (rnd) {
case 0:
assertTrue(pipe.isRunning());
break;
case 1:
assertNotNull(pipe.getInput());
break;
case 2:
pipe.write(new byte[0]);
break;
case 3:
assertTrue(pipe.clearBuffer());
break;
case 4:
pipe.waitAllConsumed();
break;
case 5:
synchronized (pipe) {
TestHelpers.ToStringTest(pipe);
}
break;
// close is not called to always be "running"
// however createPipe will re-create the pipe frequently
default:
fail("Unexpected random: " + rnd);
}
});
pipe.createPipe();

ThreadTestHelper helper =
new ThreadTestHelper(NUMBER_OF_THREADS, NUMBER_OF_TESTS);

helper.executeTest((threadNum, itNum) -> {
int rnd = RNG.nextInt(0, 6);

switch (rnd) {
case 0:
assertTrue(pipe.isRunning());
break;
case 1:
assertNotNull(pipe.getInput());
break;
case 2:
pipe.write(new byte[0]);
break;
case 3:
assertTrue(pipe.clearBuffer());
break;
case 4:
pipe.waitAllConsumed();
break;
case 5:
synchronized (pipe) {
TestHelpers.ToStringTest(pipe);
}
break;
// close is not called to always be "running"
// however createPipe will re-create the pipe frequently
default:
fail("Unexpected random: " + rnd);
}
});
}
}

@Test
public void testReadAndWrite() throws IOException, InterruptedException {
AtomicReference<IOException> exc = new AtomicReference<>();

try (DataPipe pipe = new DataPipe()) {
pipe.createPipe();

Thread writer = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
try {
pipe.write(new byte[]{(byte) (i % 256)});
} catch (IOException e) {
exc.set(e);
}
}
});
writer.start();
final Thread writer = startWriterThread(pipe);

Thread reader = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
Expand All @@ -122,16 +127,12 @@ public void testReadAndWrite() throws IOException, InterruptedException {
exc.set(e);
}
}
});
}, "reader-thread");
reader.start();

writer.join();
reader.join();

if (exc.get() != null) {
throw exc.get();
}

assertEquals(0, pipe.getInput().available());
}
}
Expand All @@ -149,4 +150,60 @@ public void testClearBuffer() throws IOException {
assertEquals(0, pipe.getInput().available());
}
}

@Test
public void testCloseBlockedWriting() throws IOException, InterruptedException {
try (DataPipe pipe = new DataPipe()) {
pipe.createPipe();

final Thread writer = startWriterThread(pipe);

// wait until thread is blocked
while (lastWrite.get() >= System.currentTimeMillis() - 100) {
Thread.sleep(100);
}

pipe.close();

writer.join();

// ignore exceptions, can happen in writer-thread here
exc.set(null);
}
}

@Test
public void testClearBlockedWriting() throws IOException, InterruptedException {
try (DataPipe pipe = new DataPipe()) {
pipe.createPipe();

final Thread writer = startWriterThread(pipe);

// wait until thread is blocked
while (lastWrite.get() >= System.currentTimeMillis() - 100) {
Thread.sleep(100);
}

pipe.clearBuffer();

writer.join();
}
}

private Thread startWriterThread(DataPipe pipe) {
Thread writer = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
try {
pipe.write(new byte[] { (byte) (i % 256) });

lastWrite.set(System.currentTimeMillis());
} catch (IOException e) {
exc.set(e);
break;
}
}
}, "writer-thread");
writer.start();
return writer;
}
}

0 comments on commit ebf09a5

Please sign in to comment.