Skip to content

Commit

Permalink
FLUME-2922. Sync SequenceFile.Writer before calling hflush
Browse files Browse the repository at this point in the history
This closes apache#52

(Kevin Conaway via Mike Percy)
  • Loading branch information
Kevin Conaway authored and mpercy committed Jul 20, 2016
1 parent 9965dae commit 358bb67
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void append(Event e) throws IOException {

@Override
public void sync() throws IOException {
writer.sync();
hflushOrSync(outStream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.CharsetDecoder;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -1325,6 +1327,99 @@ public void testCloseOnIdle() throws IOException, EventDeliveryException, Interr
fs.close();
}

/**
* This test simulates what happens when a batch of events is written to a compressed sequence
* file (and thus hsync'd to hdfs) but the file is not yet closed.
*
* When this happens, the data that we wrote should still be readable.
*/
@Test
public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException {
String hdfsPath = testPath + "/sequenceFileWriterSync";
FileSystem fs = FileSystem.get(new Configuration());
// Since we are reading a partial file we don't want to use checksums
fs.setVerifyChecksum(false);
fs.setWriteChecksum(false);

// Compression codecs that don't require native hadoop libraries
String [] codecs = {"BZip2Codec", "DeflateCodec"};

for (String codec : codecs) {
sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Collections.singletonList(
"single-event"
));

sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Arrays.asList(
"multiple-events-1",
"multiple-events-2",
"multiple-events-3",
"multiple-events-4",
"multiple-events-5"
));
}

fs.close();
}

private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, String codec,
Collection<String> eventBodies)
throws IOException, EventDeliveryException {
Path dirPath = new Path(hdfsPath);
fs.delete(dirPath, true);
fs.mkdirs(dirPath);

Context context = new Context();
context.put("hdfs.path", hdfsPath);
// Ensure the file isn't closed and rolled
context.put("hdfs.rollCount", String.valueOf(eventBodies.size() + 1));
context.put("hdfs.rollSize", "0");
context.put("hdfs.rollInterval", "0");
context.put("hdfs.batchSize", "1");
context.put("hdfs.fileType", "SequenceFile");
context.put("hdfs.codeC", codec);
context.put("hdfs.writeFormat", "Writable");
Configurables.configure(sink, context);

Channel channel = new MemoryChannel();
Configurables.configure(channel, context);

sink.setChannel(channel);
sink.start();

for (String eventBody : eventBodies) {
Transaction txn = channel.getTransaction();
txn.begin();

Event event = new SimpleEvent();
event.setBody(eventBody.getBytes());
channel.put(event);

txn.commit();
txn.close();

sink.process();
}

// Sink is _not_ closed. The file should remain open but
// the data written should be visible to readers via sync + hflush
FileStatus[] dirStat = fs.listStatus(dirPath);
Path[] paths = FileUtil.stat2Paths(dirStat);

Assert.assertEquals(1, paths.length);

SequenceFile.Reader reader =
new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.stream(fs.open(paths[0])));
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();

for (String eventBody : eventBodies) {
Assert.assertTrue(reader.next(key, value));
Assert.assertArrayEquals(eventBody.getBytes(), value.copyBytes());
}

Assert.assertFalse(reader.next(key, value));
}

private Context getContextForRetryTests() {
Context context = new Context();

Expand Down

0 comments on commit 358bb67

Please sign in to comment.