Skip to content

Commit

Permalink
add Bytes.collect
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Sep 2, 2016
1 parent 4a7fef8 commit fe0e121
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 78 deletions.
49 changes: 49 additions & 0 deletions src/main/java/com/github/davidmoten/rx/Bytes.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.davidmoten.rx;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -13,8 +14,10 @@
import com.github.davidmoten.rx.util.ZippedEntry;

import rx.Observable;
import rx.Observable.Transformer;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.SyncOnSubscribe;
Expand Down Expand Up @@ -151,4 +154,50 @@ protected ZipInputStream next(ZipInputStream zis,
});
}

public static Transformer<byte[], byte[]> collect() {
return new Transformer<byte[], byte[]>() {

@Override
public Observable<byte[]> call(Observable<byte[]> source) {
return source.collect(BosCreatorHolder.INSTANCE, BosCollectorHolder.INSTANCE)
.map(BosToArrayHolder.INSTANCE);
}
};
}

private static final class BosCreatorHolder {

static final Func0<ByteArrayOutputStream> INSTANCE = new Func0<ByteArrayOutputStream>() {

@Override
public ByteArrayOutputStream call() {
return new ByteArrayOutputStream();
}
};
}

private static final class BosCollectorHolder {

static final Action2<ByteArrayOutputStream, byte[]> INSTANCE = new Action2<ByteArrayOutputStream, byte[]>() {

@Override
public void call(ByteArrayOutputStream bos, byte[] bytes ) {
try {
bos.write(bytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}

private static final class BosToArrayHolder {
static final Func1<ByteArrayOutputStream, byte[]> INSTANCE = new Func1<ByteArrayOutputStream, byte[]>() {
@Override
public byte[] call(ByteArrayOutputStream bos) {
return bos.toByteArray();
}
};
}

}
189 changes: 111 additions & 78 deletions src/test/java/com/github/davidmoten/rx/BytesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.File;
Expand All @@ -23,83 +24,115 @@

public class BytesTest {

@Test
public void testUnzip() {
List<String> list = Bytes.unzip(new File("src/test/resources/test.zip"))
.concatMap(new Func1<ZippedEntry, Observable<String>>() {

@Override
public Observable<String> call(ZippedEntry entry) {
return Observable.just(entry.getName()).concatWith(Strings.from(entry.getInputStream()));
}
}).toList().toBlocking().single();
assertEquals(Arrays.asList("document1.txt", "hello there", "document2.txt", "how are you going?"), list);
}

@Test
public void testUnzipPartial() {
InputStream is = BytesTest.class.getResourceAsStream("/test.zip");
assertNotNull(is);
List<String> list = Bytes.unzip(is).concatMap(new Func1<ZippedEntry, Observable<String>>() {

@Override
public Observable<String> call(ZippedEntry entry) {
try {
return Observable.just((char) entry.getInputStream().read() + "");
} catch (IOException e) {
return Observable.error(e);
}
}
}).toList().toBlocking().single();
assertEquals(Arrays.asList("h", "h"), list);
}

@Test
public void testUnzipExtractSpecificFile() {
List<String> list = Bytes.unzip(new File("src/test/resources/test.zip"))
.filter(new Func1<ZippedEntry, Boolean>() {

@Override
public Boolean call(ZippedEntry entry) {
return entry.getName().equals("document2.txt");
}
}).concatMap(new Func1<ZippedEntry, Observable<String>>() {

@Override
public Observable<String> call(ZippedEntry entry) {
return Strings.from(entry.getInputStream());
}
}).toList().toBlocking().single();
assertEquals(Arrays.asList("how are you going?"), list);
}

@Test
public void isUtilClass() {
Asserts.assertIsUtilityClass(Bytes.class);
}

@Test
public void testBytesFromFile() throws IOException {
File file = new File("target/testFromFile");
file.delete();
FileOutputStream out = new FileOutputStream(file);
out.write("abcdefg".getBytes());
out.close();
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
Bytes //
.from(file, 4) //
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] b) {
try {
bytes.write(b);
} catch (IOException e) {
throw new RuntimeException();
}
}
}).subscribe();
bytes.close();
assertArrayEquals("abcdefg".getBytes(), bytes.toByteArray());
}
@Test
public void testUnzip() {
List<String> list = Bytes.unzip(new File("src/test/resources/test.zip"))
.concatMap(new Func1<ZippedEntry, Observable<String>>() {

@Override
public Observable<String> call(ZippedEntry entry) {
return Observable.just(entry.getName())
.concatWith(Strings.from(entry.getInputStream()));
}
}).toList().toBlocking().single();
assertEquals(Arrays.asList("document1.txt", "hello there", "document2.txt",
"how are you going?"), list);
}

@Test
public void testUnzipPartial() {
InputStream is = BytesTest.class.getResourceAsStream("/test.zip");
assertNotNull(is);
List<String> list = Bytes.unzip(is).concatMap(new Func1<ZippedEntry, Observable<String>>() {

@Override
public Observable<String> call(ZippedEntry entry) {
try {
return Observable.just((char) entry.getInputStream().read() + "");
} catch (IOException e) {
return Observable.error(e);
}
}
}).toList().toBlocking().single();
assertEquals(Arrays.asList("h", "h"), list);
}

@Test
public void testUnzipExtractSpecificFile() {
List<String> list = Bytes.unzip(new File("src/test/resources/test.zip"))
.filter(new Func1<ZippedEntry, Boolean>() {

@Override
public Boolean call(ZippedEntry entry) {
return entry.getName().equals("document2.txt");
}
}).concatMap(new Func1<ZippedEntry, Observable<String>>() {

@Override
public Observable<String> call(ZippedEntry entry) {
return Strings.from(entry.getInputStream());
}
}).toList().toBlocking().single();
assertEquals(Arrays.asList("how are you going?"), list);
}

@Test
public void isUtilClass() {
Asserts.assertIsUtilityClass(Bytes.class);
}

@Test
public void testBytesFromFile() throws IOException {
File file = new File("target/testFromFile");
file.delete();
FileOutputStream out = new FileOutputStream(file);
out.write("abcdefg".getBytes());
out.close();
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
Bytes //
.from(file, 4) //
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] b) {
try {
bytes.write(b);
} catch (IOException e) {
throw new RuntimeException();
}
}
}).subscribe();
bytes.close();
assertArrayEquals("abcdefg".getBytes(), bytes.toByteArray());
}

@Test
public void testCollect() {
byte[] a = { 1, 2, 3 };
byte[] b = { 4, 5, 6 };
byte[] result = Observable //
.just(a, b) //
.compose(Bytes.collect()).toBlocking().single();
assertTrue(Arrays.equals(new byte[] { 1, 2, 3, 4, 5, 6 }, result));
}

@Test
public void testCollectWithEmpty() {
byte[] a = { 1, 2, 3 };
byte[] b = {};
byte[] result = Observable //
.just(a, b) //
.compose(Bytes.collect()).toBlocking().single();
assertTrue(Arrays.equals(new byte[] { 1, 2, 3 }, result));
}

@Test
public void testCollectWithEmpties() {
byte[] a = {};
byte[] b = {};
byte[] result = Observable //
.just(a, b) //
.compose(Bytes.collect()).toBlocking().single();
assertTrue(Arrays.equals(new byte[] {}, result));
}

}

0 comments on commit fe0e121

Please sign in to comment.