diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index d8bfebfe61..105a036537 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1134,6 +1134,8 @@ cachePatternMatching true Listing direc containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. +fileHeader false Whether to add a header storing the absolute path filename. +fileHeaderKey file Header key to use when appending absolute path filename to event header. =================================== ============================== =================================================== Example for agent named a1: @@ -1151,6 +1153,7 @@ Example for agent named a1: a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 + a1.sources.r1.fileHeader = true Twitter 1% firehose Source (experimental) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 1409f2522a..8838320cbe 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -57,13 +57,16 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private boolean addByteOffset; private boolean cachePatternMatching; private boolean committed = true; + private final boolean annotateFileName; + private final String fileNameHeader; /** * Create a ReliableTaildirEventReader to watch the given directory. */ private ReliableTaildirEventReader(Map filePaths, Table headerTable, String positionFilePath, - boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) throws IOException { + boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching, + boolean annotateFileName, String fileNameHeader) throws IOException { // Sanity checks Preconditions.checkNotNull(filePaths); Preconditions.checkNotNull(positionFilePath); @@ -84,6 +87,8 @@ private ReliableTaildirEventReader(Map filePaths, this.headerTable = headerTable; this.addByteOffset = addByteOffset; this.cachePatternMatching = cachePatternMatching; + this.annotateFileName = annotateFileName; + this.fileNameHeader = fileNameHeader; updateTailFiles(skipToEnd); logger.info("Updating position from position file: " + positionFilePath); @@ -193,9 +198,14 @@ public List readEvents(int numEvents, boolean backoffWithoutNL) } Map headers = currentFile.getHeaders(); - if (headers != null && !headers.isEmpty()) { + if (annotateFileName || (headers != null && !headers.isEmpty())) { for (Event event : events) { - event.getHeaders().putAll(headers); + if (headers != null && !headers.isEmpty()) { + event.getHeaders().putAll(headers); + } + if (annotateFileName) { + event.getHeaders().put(fileNameHeader, currentFile.getPath()); + } } } committed = false; @@ -287,6 +297,10 @@ public static class Builder { private boolean skipToEnd; private boolean addByteOffset; private boolean cachePatternMatching; + private Boolean annotateFileName = + TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER; + private String fileNameHeader = + TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY; public Builder filePaths(Map filePaths) { this.filePaths = filePaths; @@ -318,9 +332,20 @@ public Builder cachePatternMatching(boolean cachePatternMatching) { return this; } + public Builder annotateFileName(boolean annotateFileName) { + this.annotateFileName = annotateFileName; + return this; + } + + public Builder fileNameHeader(String fileNameHeader) { + this.fileNameHeader = fileNameHeader; + return this; + } + public ReliableTaildirEventReader build() throws IOException { return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, - addByteOffset, cachePatternMatching); + addByteOffset, cachePatternMatching, + annotateFileName, fileNameHeader); } } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index eae1b1a40f..a107a01721 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -84,6 +84,8 @@ public class TaildirSource extends AbstractSource implements private List idleInodes = new CopyOnWriteArrayList(); private Long backoffSleepIncrement; private Long maxBackOffSleepInterval; + private boolean fileHeader; + private String fileHeaderKey; @Override public synchronized void start() { @@ -96,6 +98,8 @@ public synchronized void start() { .skipToEnd(skipToEnd) .addByteOffset(byteOffsetHeader) .cachePatternMatching(cachePatternMatching) + .annotateFileName(fileHeader) + .fileNameHeader(fileHeaderKey) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); @@ -176,6 +180,10 @@ public synchronized void configure(Context context) { PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP, PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); + fileHeader = context.getBoolean(FILENAME_HEADER, + DEFAULT_FILE_HEADER); + fileHeaderKey = context.getString(FILENAME_HEADER_KEY, + DEFAULT_FILENAME_HEADER_KEY); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index 2c49540862..f2347f38c5 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -55,4 +55,12 @@ public class TaildirSourceConfigurationConstants { */ public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching"; public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true; + + /** Header in which to put absolute path filename. */ + public static final String FILENAME_HEADER_KEY = "fileHeaderKey"; + public static final String DEFAULT_FILENAME_HEADER_KEY = "file"; + + /** Whether to include absolute path filename in a header. */ + public static final String FILENAME_HEADER = "fileHeader"; + public static final boolean DEFAULT_FILE_HEADER = false; } diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index e090b74461..097ee0ba1f 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -41,13 +41,15 @@ import java.util.List; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS; -import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants - .FILE_GROUPS_PREFIX; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -288,4 +290,30 @@ public void testFileConsumeOrder() throws IOException { assertArrayEquals("Files not consumed in expected order", expected.toArray(), consumedOrder.toArray()); } + + @Test + public void testPutFilenameHeader() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("f1\n", f1, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "fg"); + context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*"); + context.put(FILENAME_HEADER, "true"); + context.put(FILENAME_HEADER_KEY, "path"); + + Configurables.configure(source, context); + source.start(); + source.process(); + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + txn.commit(); + txn.close(); + + assertNotNull(e.getHeaders().get("path")); + assertEquals(f1.getAbsolutePath(), + e.getHeaders().get("path")); + } }