Skip to content

Commit

Permalink
FLUME-2206. ElasticSearchSink ttl field modification to mimic Elastic…
Browse files Browse the repository at this point in the history
…search way of specifying TTL

(Dib Ghosh via Hari Shreedharan)
  • Loading branch information
harishreedharan committed Oct 31, 2013
1 parent 3cc8cec commit 6dfe63c
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ derby.log
.idea
*.iml
nb-configuration.xml
.DS_Store
7 changes: 5 additions & 2 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,10 @@ indexType logs
clusterName elasticsearch Name of the ElasticSearch cluster to connect to
batchSize 100 Number of events to be written per txn.
ttl -- TTL in days, when set will cause the expired documents to be deleted automatically,
if not set documents will never be automatically deleted
if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of
integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),
h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow
http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.
serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of
either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.
serializer.* -- Properties to be passed to the serializer.
Expand All @@ -2003,7 +2006,7 @@ Example for agent named a1:
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
Expand Down Expand Up @@ -98,6 +101,9 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
private String clusterName = DEFAULT_CLUSTER_NAME;
private String indexName = DEFAULT_INDEX_NAME;
private String indexType = DEFAULT_INDEX_TYPE;
private final Pattern pattern
= Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE);
private Matcher matcher = pattern.matcher("");

private InetSocketTransportAddress[] serverAddresses;

Expand Down Expand Up @@ -269,8 +275,7 @@ public void configure(Context context) {
}

if (StringUtils.isNotBlank(context.getString(TTL))) {
this.ttlMs = TimeUnit.DAYS.toMillis(Integer.parseInt(context
.getString(TTL)));
this.ttlMs = parseTTL(context.getString(TTL));
Preconditions.checkState(ttlMs > 0, TTL
+ " must be greater than 0 or not set.");
}
Expand Down Expand Up @@ -353,6 +358,47 @@ private void openConnection() {
sinkCounter.incrementConnectionCreatedCount();
}

/*
* Returns TTL value of ElasticSearch index in milliseconds
* when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w".
* In case of unknown specifier TTL is not set. When specifier
* is not provided it defaults to days in milliseconds where the number
* of days is parsed integer from TTL string provided by user.
* <p>
* Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms / 1s / 1h / 1m
* specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or w (weeks),
* milliseconds is used as default unit.
* http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
* @param ttl TTL value provided by user in flume configuration file for the sink
* @return the ttl value in milliseconds
*/
private long parseTTL(String ttl){
matcher = matcher.reset(ttl);
while (matcher.find()) {
if (matcher.group(2).equals("ms")) {
return Long.parseLong(matcher.group(1));
} else if (matcher.group(2).equals("s")) {
return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1)));
} else if (matcher.group(2).equals("m")) {
return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1)));
} else if (matcher.group(2).equals("h")) {
return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1)));
} else if (matcher.group(2).equals("d")) {
return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
} else if (matcher.group(2).equals("w")) {
return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1)));
} else if (matcher.group(2).equals("")) {
logger.info("TTL qualifier is empty. Defaulting to day qualifier.");
return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
} else {
logger.debug("Unknown TTL qualifier provided. Setting TTL to 0.");
return 0;
}
}
logger.info("TTL not provided. Skipping the TTL config by returning 0.");
return 0;
}

/*
* FOR TESTING ONLY...
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,5 @@ public class ElasticSearchSinkConstants {
public static final String DEFAULT_INDEX_NAME = "flume";
public static final String DEFAULT_INDEX_TYPE = "log";
public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
public static final String TTL_REGEX = "^(\\d+)(\\D*)";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -286,6 +288,40 @@ public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory()
assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext);
}

@Test
public void shouldParseFullyQualifiedTTLs(){
Map<String, Long> testTTLMap = new HashMap<String, Long>();
testTTLMap.put("1ms", Long.valueOf(1));
testTTLMap.put("1s", Long.valueOf(1000));
testTTLMap.put("1m", Long.valueOf(60000));
testTTLMap.put("1h", Long.valueOf(3600000));
testTTLMap.put("1d", Long.valueOf(86400000));
testTTLMap.put("1w", Long.valueOf(604800000));
testTTLMap.put("1", Long.valueOf(86400000));

parameters.put(HOSTNAMES, "10.5.5.27");
parameters.put(CLUSTER_NAME, "testing-cluster-name");
parameters.put(INDEX_NAME, "testing-index-name");
parameters.put(INDEX_TYPE, "testing-index-type");

for (String ttl : testTTLMap.keySet()) {
parameters.put(TTL, ttl);
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));

InetSocketTransportAddress[] expected = {new InetSocketTransportAddress(
"10.5.5.27", DEFAULT_PORT)};

assertEquals("testing-cluster-name", fixture.getClusterName());
assertEquals("testing-index-name", fixture.getIndexName());
assertEquals("testing-index-type", fixture.getIndexType());
System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl)));
assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
assertArrayEquals(expected, fixture.getServerAddresses());

}
}

public static final class CustomElasticSearchIndexRequestBuilderFactory
extends AbstractElasticSearchIndexRequestBuilderFactory {

Expand Down

0 comments on commit 6dfe63c

Please sign in to comment.