diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index ae9583620a..05d43261fe 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -60,6 +60,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private boolean committed = true; private final boolean annotateFileName; private final String fileNameHeader; + private boolean inodeOnly = false; /** * Create a ReliableTaildirEventReader to watch the given directory. @@ -67,7 +68,9 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private ReliableTaildirEventReader(Map filePaths, Table headerTable, String positionFilePath, boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching, - boolean annotateFileName, String fileNameHeader) throws IOException { + boolean annotateFileName, String fileNameHeader, boolean inodeOnly) + throws IOException { + this.inodeOnly = inodeOnly; // Sanity checks Preconditions.checkNotNull(filePaths); Preconditions.checkNotNull(positionFilePath); @@ -134,7 +137,13 @@ public void loadPositionFile(String filePath) { + "inode: " + inode + ", pos: " + pos + ", path: " + path); } TailFile tf = tailFiles.get(inode); - if (tf != null && tf.updatePos(path, inode, pos)) { + boolean isLoaded = false; + if (inodeOnly) { + isLoaded = tf != null && tf.updatePosAndPath(path, inode, pos); + } else { + isLoaded = tf != null && tf.updatePos(path, inode, pos); + } + if (isLoaded) { tailFiles.put(inode, tf); } else { logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos); @@ -251,7 +260,14 @@ public List updateTailFiles(boolean skipToEnd) throws IOException { continue; } TailFile tf = tailFiles.get(inode); - if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { + + boolean isNewFile = false; + if (inodeOnly) { + isNewFile = tf == null; + } else { + isNewFile = tf == null || !tf.getPath().equals(f.getAbsolutePath()); + } + if (isNewFile) { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos); } else { @@ -267,6 +283,7 @@ public List updateTailFiles(boolean skipToEnd) throws IOException { } } tf.setNeedTail(updated); + tf.setPath(f.getAbsolutePath()); } tailFiles.put(inode, tf); updatedInodes.add(inode); @@ -308,6 +325,8 @@ public static class Builder { TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER; private String fileNameHeader = TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY; + private Boolean inodeOnly = + TaildirSourceConfigurationConstants.DEFAULT_INODE_ONLY; public Builder filePaths(Map filePaths) { this.filePaths = filePaths; @@ -349,10 +368,15 @@ public Builder fileNameHeader(String fileNameHeader) { return this; } + public Builder inodeOnly(boolean inodeOnly) { + this.inodeOnly = inodeOnly; + return this; + } + public ReliableTaildirEventReader build() throws IOException { return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset, cachePatternMatching, - annotateFileName, fileNameHeader); + annotateFileName, fileNameHeader, inodeOnly); } } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java index 42474c4f75..c19ac934d7 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java @@ -43,7 +43,7 @@ public class TailFile { private static final int NEED_READING = -1; private RandomAccessFile raf; - private final String path; + private String path; private final long inode; private long pos; private long lastUpdated; @@ -107,6 +107,10 @@ public void setPos(long pos) { this.pos = pos; } + public void setPath(String path) { + this.path = path; + } + public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; } @@ -128,6 +132,19 @@ public boolean updatePos(String path, long inode, long pos) throws IOException { } return false; } + + public boolean updatePosAndPath(String path, long inode, long pos) throws IOException { + if (this.inode == inode) { + setPos(pos); + setPath(path); + updateFilePos(pos); + logger.info("Updated position and path, file: " + path + ", inode: " + inode + + ", pos: " + pos); + return true; + } + return false; + } + public void updateFilePos(long pos) throws IOException { raf.seek(pos); lineReadPos = pos; diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 9ecccd7487..18c1eec042 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -88,6 +88,7 @@ public class TaildirSource extends AbstractSource implements private boolean fileHeader; private String fileHeaderKey; private Long maxBatchCount; + private boolean inodeOnly; @Override public synchronized void start() { @@ -102,6 +103,7 @@ public synchronized void start() { .cachePatternMatching(cachePatternMatching) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) + .inodeOnly(inodeOnly) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); @@ -187,6 +189,7 @@ public synchronized void configure(Context context) { fileHeaderKey = context.getString(FILENAME_HEADER_KEY, DEFAULT_FILENAME_HEADER_KEY); maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT); + inodeOnly = context.getBoolean(INODE_ONLY, DEFAULT_INODE_ONLY); if (maxBatchCount <= 0) { maxBatchCount = DEFAULT_MAX_BATCH_COUNT; logger.warn("Invalid maxBatchCount specified, initializing source " diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index c614e26a5d..d54b03ae55 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -67,4 +67,8 @@ public class TaildirSourceConfigurationConstants { /** The max number of batch reads from a file in one loop */ public static final String MAX_BATCH_COUNT = "maxBatchCount"; public static final Long DEFAULT_MAX_BATCH_COUNT = Long.MAX_VALUE; + + /** Whether to support file rotation in case it only checks inode nor both inode and file name */ + public static final String INODE_ONLY = "inodeOnly"; + public static final Boolean DEFAULT_INODE_ONLY = false; }