Skip to content

Commit

Permalink
FLUME-2498. Implement Taildir Source
Browse files Browse the repository at this point in the history
(Satoshi Iijima via Roshan Naik)
  • Loading branch information
Roshan Naik committed Aug 18, 2015
1 parent 32ef64d commit 757a560
Show file tree
Hide file tree
Showing 13 changed files with 1,811 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,15 @@ public enum SourceConfigurationType {
*
* @see org.apache.flume.source.jms.JMSSource
*/
JMS("org.apache.flume.conf.source.jms.JMSSourceConfiguration");
JMS("org.apache.flume.conf.source.jms.JMSSourceConfiguration"),

/**
* TAILDIR Source
*
* @see org.apache.flume.source.taildir.TaildirSource
*/
TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration")
;

private String srcConfigurationName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,15 @@ public enum SourceType {
*
* @see org.apache.flume.source.jms.JMSSource
*/
JMS("org.apache.flume.source.jms.JMSSource");
JMS("org.apache.flume.source.jms.JMSSource"),

/**
* Taildir Source
*
* @see org.apache.flume.source.taildir.TaildirSource
*/
TAILDIR("org.apache.flume.source.taildir.TaildirSource")
;

private final String sourceClassName;

Expand Down
4 changes: 4 additions & 0 deletions flume-ng-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@
<groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
<artifactId>flume-thrift-source</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-taildir-source</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
Expand Down
55 changes: 54 additions & 1 deletion flume-ng-doc/sphinx/FlumeUserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ trackerDir .flumespool Directory to store metadata related to pro
consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``,
``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
time of the files will be used to compare the files. In case of a tie, the file
with smallest laxicographical order will be consumed first. In case of ``random`` any
with smallest lexicographical order will be consumed first. In case of ``random`` any
file will be picked randomly. When using ``oldest`` and ``youngest`` the whole
directory will be scanned to pick the oldest/youngest file, which might be slow if there
are a large number of files, while using ``random`` may cause old files to be consumed
Expand Down Expand Up @@ -1090,6 +1090,59 @@ Property Name Default Description
deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request
========================== ================== =======================================================================

Taildir Source
~~~~~~~~~~~~~~~~~~~~~~~~~
.. note:: **This source is provided as a preview feature. It does not work on Windows.** This source requires Java version 1.7 or later.

Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files.
If the new lines are being written, this source will retry reading them in wait for the completion of the write.

This source is reliable and will not miss data even when the tailing files rotate.
It periodically writes the last read position of each files on the given position file in JSON format.
If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.

In other use case, this source can also start tailing from the arbitrary position for each files using the given position file.
When there is no position file on the specified path, it will start tailing from the first line of each files by default.

Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.

This source does not rename or delete or do any modifications to the file being tailed.
Currently this source does not support tailing binary files. It reads text files line by line.

=================================== ============================== ===================================================
Property Name Default Description
=================================== ============================== ===================================================
**channels** --
**type** -- The component type name, needs to be ``TAILDIR``.
**filegroups** -- Space-separated list of file groups. Each file group indicates a set of files to be tailed.
**filegroups.<filegroupName>** -- Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.<filegroupName>.<headerKey> -- Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called 'byteoffset'.
skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file.
idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file.
batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine.
backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
=================================== ============================== ===================================================

Example for agent named a1:

.. code-block:: properties
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
Twitter 1% firehose Source (experimental)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
60 changes: 60 additions & 0 deletions flume-ng-sources/flume-taildir-source/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>flume-ng-sources</artifactId>
<groupId>org.apache.flume</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>

<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-taildir-source</artifactId>
<name>Flume Taildir Source</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 757a560

Please sign in to comment.