Skip to content

Commit

Permalink
FLUME-1955. fileSuffix does not work with compressed streams.
Browse files Browse the repository at this point in the history
(Mike Percy via Hari Shreedharan)
  • Loading branch information
harishreedharan committed Mar 19, 2013
1 parent 4c5220b commit 2f6fea5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,9 @@ private void doOpen() throws IOException, InterruptedException {

String fullFileName = fileName + "." + counter;

if (codeC == null && fileSuffix != null && fileSuffix.length() > 0) {
if (fileSuffix != null && fileSuffix.length() > 0) {
fullFileName += fileSuffix;
}

if(codeC != null) {
} else if (codeC != null) {
fullFileName += codeC.getDefaultExtension();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ private static boolean codecMatches(Class<? extends CompressionCodec> cls,
return false;
}

private static CompressionCodec getCodec(String codecName) {
@VisibleForTesting
static CompressionCodec getCodec(String codecName) {
Configuration conf = new Configuration();
List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
.getCodecClasses(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,37 @@ public long currentTimeMillis() {
Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
}

@Test
public void testFileSuffixCompressed()
throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String suffix = ".foo";

MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
"/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"),
SequenceFile.CompressionType.BLOCK, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
0, null, null, 30000, Executors.newSingleThreadExecutor());

// Need to override system time use for test so we know what to expect
final long testTime = System.currentTimeMillis();

Clock testClock = new Clock() {
public long currentTimeMillis() {
return testTime;
}
};
bucketWriter.setClock(testClock);

Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);

Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath()
.endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
}

@Test
public void testInUsePrefix() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
Expand Down

0 comments on commit 2f6fea5

Please sign in to comment.