Skip to content

Commit

Permalink
PIG-1316: TextLoader should use Bzip2TextInputFormat for bzip files s…
Browse files Browse the repository at this point in the history
…o that bzip files can be efficiently processed by splitting the files (pradeepkth)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/pig/trunk@928079 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Pradeep Kamath committed Mar 26, 2010
1 parent b5501a1 commit fdea21a
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 32 deletions.
4 changes: 2 additions & 2 deletions lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.tools.bzip2r.CBZip2InputStream;

@SuppressWarnings("unchecked")
public class Bzip2TextInputFormat extends FileInputFormat {
public class Bzip2TextInputFormat extends PigFileInputFormat {

/**
* Treats keys as offset in file and value as line. Since the input file is
Expand Down
11 changes: 9 additions & 2 deletions src/org/apache/pig/builtin/TextLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
Expand All @@ -45,6 +46,7 @@
public class TextLoader extends LoadFunc implements LoadCaster {
protected RecordReader in = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private String loadLocation;

@Override
public Tuple getNext() throws IOException {
Expand Down Expand Up @@ -198,7 +200,11 @@ public byte[] toBytes(Tuple t) throws IOException {

@Override
public InputFormat getInputFormat() {
return new TextInputFormat();
if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
return new Bzip2TextInputFormat();
} else {
return new PigTextInputFormat();
}
}

@Override
Expand All @@ -213,6 +219,7 @@ public void prepareToRead(RecordReader reader, PigSplit split) {

@Override
public void setLocation(String location, Job job) throws IOException {
loadLocation = location;
FileInputFormat.setInputPaths(job, location);
}

Expand Down
90 changes: 65 additions & 25 deletions test/org/apache/pig/test/TestBZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
Expand Down Expand Up @@ -235,17 +236,28 @@ public void testEmptyBzip() throws Exception {
* Tests the case where a bzip block ends exactly at the end of the {@link InputSplit}
* with the block header ending a few bits into the last byte of current
* InputSplit. This case results in dropped records in Pig 0.6 release
* This test also tests that bzip files couple of dirs deep can be read by
* specifying the top level dir.
*/
@Test
public void testBlockHeaderEndingAtSplitNotByteAligned() throws IOException {
// the actual input file is at
// test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
// In this test we will load test/org/apache/pig/test/data/bzipdir1.bz2 to also
// test that the BZip2TextInputFormat can read subdirs recursively
String inputFileName =
"test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2";
"test/org/apache/pig/test/data/bzipdir1.bz2";
Long expectedCount = 74999L; // number of lines in above file
// the first block in the above file exactly ends a few bits into the
// byte at position 136500
int splitSize = 136500;
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize);
try {
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
testCount(inputFileName, expectedCount, splitSize, "TextLoader()");
} finally {
Util.deleteFile(cluster, inputFileName);
}
}

/**
Expand All @@ -262,47 +274,75 @@ public void testBlockHeaderEndingWithCR() throws IOException {
Long expectedCount = 82094L;
// the first block in the above file exactly ends at the byte at
// position 136498 and the last byte is a carriage return ('\r')
int splitSize = 136498;
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize);
try {
int splitSize = 136498;
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
} finally {
Util.deleteFile(cluster, inputFileName);
}
}

/**
* Tests the case where a bzip block ends exactly at the end of the input
* split and has more data which results in overcounting (record duplication)
* in Pig 0.6
*
*/
@Test
public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {

String inputFileName =
"test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
Long expectedCount = 1041046L; // number of lines in above file
// the first block in the above file exactly ends a few bits into the
// byte at position 136500
int splitSize = 136500;
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize);
try {
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
} finally {
Util.deleteFile(cluster, inputFileName);
}
}

private void testCount(String inputFileName, Long expectedCount,
int splitSize) throws IOException {
try {
String script = "a = load '" + inputFileName + "';" +
"b = group a all;" +
"c = foreach b generate COUNT_STAR(a);";
Properties props = new Properties();
for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
props.put(entry.getKey(), entry.getValue());
int splitSize, String loadFuncSpec) throws IOException {
String outputFile = "/tmp/bz-output";
// simple load-store script to verify that the bzip input is getting
// split
String scriptToTestSplitting = "a = load '" +inputFileName + "' using " +
loadFuncSpec + "; store a into '" + outputFile + "';";

String script = "a = load '" + inputFileName + "';" +
"b = group a all;" +
"c = foreach b generate COUNT_STAR(a);";
Properties props = new Properties();
for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
props.setProperty("mapred.max.split.size", Integer.toString(splitSize));
PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
PigServer pig = new PigServer(pigContext);
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
fs.delete(new Path(outputFile), true);
Util.registerMultiLineQuery(pig, scriptToTestSplitting);

// verify that > 1 maps were launched due to splitting of the bzip input
FileStatus[] files = fs.listStatus(new Path(outputFile));
int numPartFiles = 0;
for (FileStatus fileStatus : files) {
if(fileStatus.getPath().getName().startsWith("part")) {
numPartFiles++;
}
props.setProperty("mapred.max.split.size", Integer.toString(splitSize));
PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
PigServer pig = new PigServer(pigContext);
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("c");
Long result = (Long) it.next().get(0);
assertEquals(expectedCount, result);
} finally {
Util.deleteFile(cluster, inputFileName);
}
assertEquals(true, numPartFiles > 0);

// verify record count to verify we read bzip data correctly
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("c");
Long result = (Long) it.next().get(0);
assertEquals(expectedCount, result);

}
}
14 changes: 11 additions & 3 deletions test/org/apache/pig/test/TestBuiltin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1377,15 +1377,23 @@ public void testLFBin() throws Exception {
}
*/


/**
* test {@link TextLoader} - this also tests that {@link TextLoader} is capable
* of reading data a couple of dirs deep when the input specified is the top
* level directory
*/
@Test
public void testLFText() throws Exception {
String input1 = "This is some text.\nWith a newline in it.\n";
String expected1 = "This is some text.";
String expected2 = "With a newline in it.";
Util.createInputFile(cluster, "testLFTest-input1.txt", new String[] {input1});
Util.createInputFile(cluster,
"/tmp/testLFTextdir1/testLFTextdir2/testLFTest-input1.txt",
new String[] {input1});
// check that loading the top level dir still reading the file a couple
// of subdirs below
LoadFunc text1 = new ReadToEndLoader(new TextLoader(), ConfigurationUtil.
toConfiguration(cluster.getProperties()), "testLFTest-input1.txt", 0);
toConfiguration(cluster.getProperties()), "/tmp/testLFTextdir1", 0);
Tuple f1 = text1.getNext();
Tuple f2 = text1.getNext();
assertTrue(expected1.equals(f1.get(0).toString()) &&
Expand Down

0 comments on commit fdea21a

Please sign in to comment.