Skip to content

Commit

Permalink
PIG-1304: Fail underlying M/R jobs when concatenated gzip and bz2 fil…
Browse files Browse the repository at this point in the history
…es are provided as input

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1070662 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Richard Ding committed Feb 14, 2011
1 parent a340046 commit 2505be2
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ PIG-1696: Performance: Use System.arraycopy() instead of manually copying the by

BUG FIXES

PIG-1304: Fail underlying M/R jobs when concatenated gzip and bz2 files are provided as input (laukik via rding)

PIG-1852: Packaging antlr jar with pig.jar (rding via daijy)

PIG-1717 pig needs to call setPartitionFilter if schema is null but
Expand Down
9 changes: 4 additions & 5 deletions lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
initBlock(blockSize != -1);
setupBlock();
}

public CBZip2InputStream(FSDataInputStream zStream) throws IOException {
this(zStream, -1, Long.MAX_VALUE);
}

@Override
public int read() throws IOException {
Expand Down Expand Up @@ -408,7 +404,10 @@ private void complete() throws IOException {
storedCombinedCRC != computedCombinedCRC) {
crcError();
}

if (innerBsStream.getPos() < endOffsetOfSplit) {
throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
+ "Loading of concatenated bz2 files is not supported");
}
bsFinishedWithStream();
streamEnd = true;
}
Expand Down
115 changes: 111 additions & 4 deletions test/org/apache/pig/test/TestBZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -91,7 +95,7 @@ public void testBzipInPig() throws Exception {
pig.getPigContext().getProperties()));
FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() +
"/part-r-00000.bz2"));
CBZip2InputStream cis = new CBZip2InputStream(is);
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());

// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
Expand Down Expand Up @@ -150,7 +154,7 @@ public void testBzipInPig2() throws Exception {
pig.getPigContext().getProperties()));
FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() +
"/part-r-00000.bz2"));
CBZip2InputStream cis = new CBZip2InputStream(is);
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());

// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
Expand Down Expand Up @@ -266,7 +270,7 @@ public void testEmptyBzipInPig() throws Exception {
pig.getPigContext().getProperties()));
FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() +
"/part-r-00000.bz2"));
CBZip2InputStream cis = new CBZip2InputStream(is);
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());

// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
Expand Down Expand Up @@ -294,7 +298,7 @@ public void testEmptyBzip() throws Exception {
assertNotSame(0, tmp.length());
FileSystem fs = FileSystem.getLocal(new Configuration(false));
CBZip2InputStream cis = new CBZip2InputStream(
fs.open(new Path(tmp.getAbsolutePath())));
fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
assertEquals(-1, cis.read(new byte[100]));
cis.close();
tmp.delete();
Expand Down Expand Up @@ -471,4 +475,107 @@ public void testBzipStoreInMultiQuery2() throws Exception {
assertTrue(stat.getLen() > 0);
}

/**
* Tests that Pig throws an Exception when the input files to be loaded are actually
* a result of concatenating 2 or more bz2 files. Pig should not silently ignore part
* of the input data.
*/
@Test (expected=IOException.class)
public void testBZ2Concatenation() throws Exception {
String[] inputData1 = new String[] {
"1\ta",
"2\taa"
};
String[] inputData2 = new String[] {
"1\tb",
"2\tbb"
};
String[] inputDataMerged = new String[] {
"1\ta",
"2\taa",
"1\tb",
"2\tbb"
};

// bzip compressed input file1
File in1 = File.createTempFile("junit", ".bz2");
String compressedInputFileName1 = in1.getAbsolutePath();
in1.deleteOnExit();

// file2
File in2 = File.createTempFile("junit", ".bz2");
String compressedInputFileName2 = in2.getAbsolutePath();
in1.deleteOnExit();

String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);

try {
CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in1));
for (int i = 0; i < inputData1.length; i++) {
StringBuffer sb = new StringBuffer();
sb.append(inputData1[i]).append("\n");
byte bytes[] = sb.toString().getBytes();
cos.write(bytes);
}
cos.close();

CBZip2OutputStream cos2 =
new CBZip2OutputStream(new FileOutputStream(in2));
for (int i = 0; i < inputData2.length; i++) {
StringBuffer sb = new StringBuffer();
sb.append(inputData2[i]).append("\n");
byte bytes[] = sb.toString().getBytes();
cos2.write(bytes);
}
cos2.close();

// cat
catInto(compressedInputFileName2, compressedInputFileName1);
Util.copyFromLocalToCluster(cluster, compressedInputFileName1,
compressedInputFileName1);

// pig script to read uncompressed input
String script = "a = load '" + unCompressedInputFileName +"';";
PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
.getProperties());
pig.registerQuery(script);
Iterator<Tuple> it1 = pig.openIterator("a");

// pig script to read compressed concatenated input
script = "a = load '" + compressedInputFileName1 +"';";
pig.registerQuery(script);
Iterator<Tuple> it2 = pig.openIterator("a");

while(it1.hasNext()) {
Tuple t1 = it1.next();
Tuple t2 = it2.next();
Assert.assertEquals(t1, t2);
}

Assert.assertFalse(it2.hasNext());

} finally {
in1.delete();
in2.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
}

}

/*
* Concatenate the contents of src file to the contents of dest file
*/
private void catInto(String src, String dest) throws IOException {
BufferedWriter out = new BufferedWriter(new FileWriter(dest, true));
BufferedReader in = new BufferedReader(new FileReader(src));
String str;
while ((str = in.readLine()) != null) {
out.write(str);
}
in.close();
out.close();
}

}

0 comments on commit 2505be2

Please sign in to comment.