From ebf09a553451f5129a6366333001af81f5908a6c Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Tue, 16 Jul 2024 09:15:12 +0200 Subject: [PATCH] Add more tests for DataPipe to verify clearing/closing the pipe while writing --- .../org/dstadler/audio/util/DataPipeTest.java | 169 ++++++++++++------ 1 file changed, 113 insertions(+), 56 deletions(-) diff --git a/src/test/java/org/dstadler/audio/util/DataPipeTest.java b/src/test/java/org/dstadler/audio/util/DataPipeTest.java index 2f68d4f..0d14696 100644 --- a/src/test/java/org/dstadler/audio/util/DataPipeTest.java +++ b/src/test/java/org/dstadler/audio/util/DataPipeTest.java @@ -1,9 +1,11 @@ package org.dstadler.audio.util; -import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.rng.UniformRandomProvider; +import org.apache.commons.rng.simple.RandomSource; import org.dstadler.commons.testing.TestHelpers; import org.dstadler.commons.testing.ThreadTestHelper; +import org.junit.After; import org.junit.Test; import java.io.IOException; @@ -11,8 +13,21 @@ import static org.junit.Assert.*; +@SuppressWarnings("BusyWait") public class DataPipeTest { - @Test + private static final UniformRandomProvider RNG = RandomSource.XO_RO_SHI_RO_128_PP.create(); + + private final AtomicReference exc = new AtomicReference<>(); + private final AtomicReference lastWrite = new AtomicReference<>(System.currentTimeMillis()); + + @After + public void tearDown() throws IOException { + if (exc.get() != null) { + throw exc.get(); + } + } + + @Test public void testDataPipeNotStarted() throws IOException, InterruptedException { DataPipe pipe = new DataPipe(); @@ -54,64 +69,54 @@ public void testDataPipeCreated() throws IOException, InterruptedException { private static final int NUMBER_OF_THREADS = 10; private static final int NUMBER_OF_TESTS = 2000; - @Test + @Test public void testMultipleThreads() throws Throwable { - DataPipe pipe = new DataPipe(); - - pipe.createPipe(); + try (DataPipe pipe = new DataPipe()) { - ThreadTestHelper helper = - new ThreadTestHelper(NUMBER_OF_THREADS, NUMBER_OF_TESTS); - - helper.executeTest((threadNum, itNum) -> { - int rnd = RandomUtils.nextInt(0, 6); - - switch (rnd) { - case 0: - assertTrue(pipe.isRunning()); - break; - case 1: - assertNotNull(pipe.getInput()); - break; - case 2: - pipe.write(new byte[0]); - break; - case 3: - assertTrue(pipe.clearBuffer()); - break; - case 4: - pipe.waitAllConsumed(); - break; - case 5: - synchronized (pipe) { - TestHelpers.ToStringTest(pipe); - } - break; - // close is not called to always be "running" - // however createPipe will re-create the pipe frequently - default: - fail("Unexpected random: " + rnd); - } - }); + pipe.createPipe(); + + ThreadTestHelper helper = + new ThreadTestHelper(NUMBER_OF_THREADS, NUMBER_OF_TESTS); + + helper.executeTest((threadNum, itNum) -> { + int rnd = RNG.nextInt(0, 6); + + switch (rnd) { + case 0: + assertTrue(pipe.isRunning()); + break; + case 1: + assertNotNull(pipe.getInput()); + break; + case 2: + pipe.write(new byte[0]); + break; + case 3: + assertTrue(pipe.clearBuffer()); + break; + case 4: + pipe.waitAllConsumed(); + break; + case 5: + synchronized (pipe) { + TestHelpers.ToStringTest(pipe); + } + break; + // close is not called to always be "running" + // however createPipe will re-create the pipe frequently + default: + fail("Unexpected random: " + rnd); + } + }); + } } @Test public void testReadAndWrite() throws IOException, InterruptedException { - AtomicReference exc = new AtomicReference<>(); - try (DataPipe pipe = new DataPipe()) { pipe.createPipe(); - Thread writer = new Thread(() -> { - for (int i = 0; i < 100000; i++) { - try { - pipe.write(new byte[]{(byte) (i % 256)}); - } catch (IOException e) { - exc.set(e); - } - } - }); - writer.start(); + final Thread writer = startWriterThread(pipe); Thread reader = new Thread(() -> { for (int i = 0; i < 100000; i++) { @@ -122,16 +127,12 @@ public void testReadAndWrite() throws IOException, InterruptedException { exc.set(e); } } - }); + }, "reader-thread"); reader.start(); writer.join(); reader.join(); - if (exc.get() != null) { - throw exc.get(); - } - assertEquals(0, pipe.getInput().available()); } } @@ -149,4 +150,60 @@ public void testClearBuffer() throws IOException { assertEquals(0, pipe.getInput().available()); } } + + @Test + public void testCloseBlockedWriting() throws IOException, InterruptedException { + try (DataPipe pipe = new DataPipe()) { + pipe.createPipe(); + + final Thread writer = startWriterThread(pipe); + + // wait until thread is blocked + while (lastWrite.get() >= System.currentTimeMillis() - 100) { + Thread.sleep(100); + } + + pipe.close(); + + writer.join(); + + // ignore exceptions, can happen in writer-thread here + exc.set(null); + } + } + + @Test + public void testClearBlockedWriting() throws IOException, InterruptedException { + try (DataPipe pipe = new DataPipe()) { + pipe.createPipe(); + + final Thread writer = startWriterThread(pipe); + + // wait until thread is blocked + while (lastWrite.get() >= System.currentTimeMillis() - 100) { + Thread.sleep(100); + } + + pipe.clearBuffer(); + + writer.join(); + } + } + + private Thread startWriterThread(DataPipe pipe) { + Thread writer = new Thread(() -> { + for (int i = 0; i < 100000; i++) { + try { + pipe.write(new byte[] { (byte) (i % 256) }); + + lastWrite.set(System.currentTimeMillis()); + } catch (IOException e) { + exc.set(e); + break; + } + } + }, "writer-thread"); + writer.start(); + return writer; + } } \ No newline at end of file