Skip to content

Commit

Permalink
FLUME-2955. Add file path to the header in TaildirSource
Browse files Browse the repository at this point in the history
Allow for adding a file path to the header dynamically. This is
particularly useful when the filegroup path contains a regex expression.

(tinawenqiao via Mike Percy)
  • Loading branch information
tinawenqiao authored and mpercy committed Jul 20, 2016
1 parent 358bb67 commit 1ca0765
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 6 deletions.
3 changes: 3 additions & 0 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,8 @@ cachePatternMatching true Listing direc
containing thousands of files. Caching the list of matching files can improve performance.
The order in which files are consumed will also be cached.
Requires that the file system keeps track of modification times with at least a 1-second granularity.
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
=================================== ============================== ===================================================

Example for agent named a1:
Expand All @@ -1151,6 +1153,7 @@ Example for agent named a1:
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
Twitter 1% firehose Source (experimental)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
private boolean addByteOffset;
private boolean cachePatternMatching;
private boolean committed = true;
private final boolean annotateFileName;
private final String fileNameHeader;

/**
* Create a ReliableTaildirEventReader to watch the given directory.
*/
private ReliableTaildirEventReader(Map<String, String> filePaths,
Table<String, String, String> headerTable, String positionFilePath,
boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) throws IOException {
boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
boolean annotateFileName, String fileNameHeader) throws IOException {
// Sanity checks
Preconditions.checkNotNull(filePaths);
Preconditions.checkNotNull(positionFilePath);
Expand All @@ -84,6 +87,8 @@ private ReliableTaildirEventReader(Map<String, String> filePaths,
this.headerTable = headerTable;
this.addByteOffset = addByteOffset;
this.cachePatternMatching = cachePatternMatching;
this.annotateFileName = annotateFileName;
this.fileNameHeader = fileNameHeader;
updateTailFiles(skipToEnd);

logger.info("Updating position from position file: " + positionFilePath);
Expand Down Expand Up @@ -193,9 +198,14 @@ public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
}

Map<String, String> headers = currentFile.getHeaders();
if (headers != null && !headers.isEmpty()) {
if (annotateFileName || (headers != null && !headers.isEmpty())) {
for (Event event : events) {
event.getHeaders().putAll(headers);
if (headers != null && !headers.isEmpty()) {
event.getHeaders().putAll(headers);
}
if (annotateFileName) {
event.getHeaders().put(fileNameHeader, currentFile.getPath());
}
}
}
committed = false;
Expand Down Expand Up @@ -287,6 +297,10 @@ public static class Builder {
private boolean skipToEnd;
private boolean addByteOffset;
private boolean cachePatternMatching;
private Boolean annotateFileName =
TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
private String fileNameHeader =
TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;

public Builder filePaths(Map<String, String> filePaths) {
this.filePaths = filePaths;
Expand Down Expand Up @@ -318,9 +332,20 @@ public Builder cachePatternMatching(boolean cachePatternMatching) {
return this;
}

public Builder annotateFileName(boolean annotateFileName) {
this.annotateFileName = annotateFileName;
return this;
}

public Builder fileNameHeader(String fileNameHeader) {
this.fileNameHeader = fileNameHeader;
return this;
}

public ReliableTaildirEventReader build() throws IOException {
return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
addByteOffset, cachePatternMatching);
addByteOffset, cachePatternMatching,
annotateFileName, fileNameHeader);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class TaildirSource extends AbstractSource implements
private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
private Long backoffSleepIncrement;
private Long maxBackOffSleepInterval;
private boolean fileHeader;
private String fileHeaderKey;

@Override
public synchronized void start() {
Expand All @@ -96,6 +98,8 @@ public synchronized void start() {
.skipToEnd(skipToEnd)
.addByteOffset(byteOffsetHeader)
.cachePatternMatching(cachePatternMatching)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.build();
} catch (IOException e) {
throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
Expand Down Expand Up @@ -176,6 +180,10 @@ public synchronized void configure(Context context) {
PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
fileHeader = context.getBoolean(FILENAME_HEADER,
DEFAULT_FILE_HEADER);
fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
DEFAULT_FILENAME_HEADER_KEY);

if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,12 @@ public class TaildirSourceConfigurationConstants {
*/
public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching";
public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true;

/** Header in which to put absolute path filename. */
public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
public static final String DEFAULT_FILENAME_HEADER_KEY = "file";

/** Whether to include absolute path filename in a header. */
public static final String FILENAME_HEADER = "fileHeader";
public static final boolean DEFAULT_FILE_HEADER = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import java.util.List;

import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants
.FILE_GROUPS_PREFIX;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -288,4 +290,30 @@ public void testFileConsumeOrder() throws IOException {
assertArrayEquals("Files not consumed in expected order", expected.toArray(),
consumedOrder.toArray());
}

@Test
public void testPutFilenameHeader() throws IOException {
File f1 = new File(tmpDir, "file1");
Files.write("f1\n", f1, Charsets.UTF_8);

Context context = new Context();
context.put(POSITION_FILE, posFilePath);
context.put(FILE_GROUPS, "fg");
context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*");
context.put(FILENAME_HEADER, "true");
context.put(FILENAME_HEADER_KEY, "path");

Configurables.configure(source, context);
source.start();
source.process();
Transaction txn = channel.getTransaction();
txn.begin();
Event e = channel.take();
txn.commit();
txn.close();

assertNotNull(e.getHeaders().get("path"));
assertEquals(f1.getAbsolutePath(),
e.getHeaders().get("path"));
}
}

0 comments on commit 1ca0765

Please sign in to comment.