From fe0e12119a51b697a6504b52626ac89ce992ae94 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Fri, 2 Sep 2016 12:05:03 +1000 Subject: [PATCH] add Bytes.collect --- .../java/com/github/davidmoten/rx/Bytes.java | 49 +++++ .../com/github/davidmoten/rx/BytesTest.java | 189 ++++++++++-------- 2 files changed, 160 insertions(+), 78 deletions(-) diff --git a/src/main/java/com/github/davidmoten/rx/Bytes.java b/src/main/java/com/github/davidmoten/rx/Bytes.java index cdb20f18..286651bb 100644 --- a/src/main/java/com/github/davidmoten/rx/Bytes.java +++ b/src/main/java/com/github/davidmoten/rx/Bytes.java @@ -1,6 +1,7 @@ package com.github.davidmoten.rx; import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -13,8 +14,10 @@ import com.github.davidmoten.rx.util.ZippedEntry; import rx.Observable; +import rx.Observable.Transformer; import rx.Observer; import rx.functions.Action1; +import rx.functions.Action2; import rx.functions.Func0; import rx.functions.Func1; import rx.observables.SyncOnSubscribe; @@ -151,4 +154,50 @@ protected ZipInputStream next(ZipInputStream zis, }); } + public static Transformer collect() { + return new Transformer() { + + @Override + public Observable call(Observable source) { + return source.collect(BosCreatorHolder.INSTANCE, BosCollectorHolder.INSTANCE) + .map(BosToArrayHolder.INSTANCE); + } + }; + } + + private static final class BosCreatorHolder { + + static final Func0 INSTANCE = new Func0() { + + @Override + public ByteArrayOutputStream call() { + return new ByteArrayOutputStream(); + } + }; + } + + private static final class BosCollectorHolder { + + static final Action2 INSTANCE = new Action2() { + + @Override + public void call(ByteArrayOutputStream bos, byte[] bytes ) { + try { + bos.write(bytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static final class BosToArrayHolder { + static final Func1 INSTANCE = new Func1() { + @Override + public byte[] call(ByteArrayOutputStream bos) { + return bos.toByteArray(); + } + }; + } + } diff --git a/src/test/java/com/github/davidmoten/rx/BytesTest.java b/src/test/java/com/github/davidmoten/rx/BytesTest.java index 4a86fc85..fcd12243 100644 --- a/src/test/java/com/github/davidmoten/rx/BytesTest.java +++ b/src/test/java/com/github/davidmoten/rx/BytesTest.java @@ -3,6 +3,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.File; @@ -23,83 +24,115 @@ public class BytesTest { - @Test - public void testUnzip() { - List list = Bytes.unzip(new File("src/test/resources/test.zip")) - .concatMap(new Func1>() { - - @Override - public Observable call(ZippedEntry entry) { - return Observable.just(entry.getName()).concatWith(Strings.from(entry.getInputStream())); - } - }).toList().toBlocking().single(); - assertEquals(Arrays.asList("document1.txt", "hello there", "document2.txt", "how are you going?"), list); - } - - @Test - public void testUnzipPartial() { - InputStream is = BytesTest.class.getResourceAsStream("/test.zip"); - assertNotNull(is); - List list = Bytes.unzip(is).concatMap(new Func1>() { - - @Override - public Observable call(ZippedEntry entry) { - try { - return Observable.just((char) entry.getInputStream().read() + ""); - } catch (IOException e) { - return Observable.error(e); - } - } - }).toList().toBlocking().single(); - assertEquals(Arrays.asList("h", "h"), list); - } - - @Test - public void testUnzipExtractSpecificFile() { - List list = Bytes.unzip(new File("src/test/resources/test.zip")) - .filter(new Func1() { - - @Override - public Boolean call(ZippedEntry entry) { - return entry.getName().equals("document2.txt"); - } - }).concatMap(new Func1>() { - - @Override - public Observable call(ZippedEntry entry) { - return Strings.from(entry.getInputStream()); - } - }).toList().toBlocking().single(); - assertEquals(Arrays.asList("how are you going?"), list); - } - - @Test - public void isUtilClass() { - Asserts.assertIsUtilityClass(Bytes.class); - } - - @Test - public void testBytesFromFile() throws IOException { - File file = new File("target/testFromFile"); - file.delete(); - FileOutputStream out = new FileOutputStream(file); - out.write("abcdefg".getBytes()); - out.close(); - final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - Bytes // - .from(file, 4) // - .doOnNext(new Action1() { - @Override - public void call(byte[] b) { - try { - bytes.write(b); - } catch (IOException e) { - throw new RuntimeException(); - } - } - }).subscribe(); - bytes.close(); - assertArrayEquals("abcdefg".getBytes(), bytes.toByteArray()); - } + @Test + public void testUnzip() { + List list = Bytes.unzip(new File("src/test/resources/test.zip")) + .concatMap(new Func1>() { + + @Override + public Observable call(ZippedEntry entry) { + return Observable.just(entry.getName()) + .concatWith(Strings.from(entry.getInputStream())); + } + }).toList().toBlocking().single(); + assertEquals(Arrays.asList("document1.txt", "hello there", "document2.txt", + "how are you going?"), list); + } + + @Test + public void testUnzipPartial() { + InputStream is = BytesTest.class.getResourceAsStream("/test.zip"); + assertNotNull(is); + List list = Bytes.unzip(is).concatMap(new Func1>() { + + @Override + public Observable call(ZippedEntry entry) { + try { + return Observable.just((char) entry.getInputStream().read() + ""); + } catch (IOException e) { + return Observable.error(e); + } + } + }).toList().toBlocking().single(); + assertEquals(Arrays.asList("h", "h"), list); + } + + @Test + public void testUnzipExtractSpecificFile() { + List list = Bytes.unzip(new File("src/test/resources/test.zip")) + .filter(new Func1() { + + @Override + public Boolean call(ZippedEntry entry) { + return entry.getName().equals("document2.txt"); + } + }).concatMap(new Func1>() { + + @Override + public Observable call(ZippedEntry entry) { + return Strings.from(entry.getInputStream()); + } + }).toList().toBlocking().single(); + assertEquals(Arrays.asList("how are you going?"), list); + } + + @Test + public void isUtilClass() { + Asserts.assertIsUtilityClass(Bytes.class); + } + + @Test + public void testBytesFromFile() throws IOException { + File file = new File("target/testFromFile"); + file.delete(); + FileOutputStream out = new FileOutputStream(file); + out.write("abcdefg".getBytes()); + out.close(); + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + Bytes // + .from(file, 4) // + .doOnNext(new Action1() { + @Override + public void call(byte[] b) { + try { + bytes.write(b); + } catch (IOException e) { + throw new RuntimeException(); + } + } + }).subscribe(); + bytes.close(); + assertArrayEquals("abcdefg".getBytes(), bytes.toByteArray()); + } + + @Test + public void testCollect() { + byte[] a = { 1, 2, 3 }; + byte[] b = { 4, 5, 6 }; + byte[] result = Observable // + .just(a, b) // + .compose(Bytes.collect()).toBlocking().single(); + assertTrue(Arrays.equals(new byte[] { 1, 2, 3, 4, 5, 6 }, result)); + } + + @Test + public void testCollectWithEmpty() { + byte[] a = { 1, 2, 3 }; + byte[] b = {}; + byte[] result = Observable // + .just(a, b) // + .compose(Bytes.collect()).toBlocking().single(); + assertTrue(Arrays.equals(new byte[] { 1, 2, 3 }, result)); + } + + @Test + public void testCollectWithEmpties() { + byte[] a = {}; + byte[] b = {}; + byte[] result = Observable // + .just(a, b) // + .compose(Bytes.collect()).toBlocking().single(); + assertTrue(Arrays.equals(new byte[] {}, result)); + } }