diff --git a/reactor-top-tags/README.md b/reactor-top-tags/README.md index 7a7c402..54dbe47 100644 --- a/reactor-top-tags/README.md +++ b/reactor-top-tags/README.md @@ -11,7 +11,11 @@ In order to install the module run it in your Spring XD installation, you will n ## Code Tour -The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java). This uses the Stream API to perform an average over the last 5 values of data. The [Tuple](http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic container for keyed data. +The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java). +This uses the Stream API to calculate the most referenced tags in a given time window. The[Tuple] +(http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic +container for keyed data. + ## Building @@ -42,20 +46,11 @@ xd:> Now create an deploy a stream: ``` -xd:>stream create reactor --definition "http | reactor-top-tags --inputType=application/x-xd-tuple | log" --deploy -``` - -To post several messages, use the script file generateData.script located in this repository. - -``` -xd:>script --file [path-to]/generateData.script +xd:>stream create reactor --definition "tweetstream | reactor-top-tags | log" --deploy ``` -This will post JSON data such as `{"id":"1","measurement":"10"}` with increasing valuespwd for the measurement. The use of the inputType option (all modules have this option) instructs XD to convert the JSON string to an XD Tuple object before invoking the process method. - You should see the stream output in the Spring XD log: ``` -17:17:25,064 1.1.0.SNAP INFO pool-10-thread-12 sink.test3 - {"id":"d5c9617b-4bac-3786-6559-6b0ab221496c","timestamp":1419027445042,"average":12.0} -17:17:25,137 1.1.0.SNAP INFO pool-10-thread-6 sink.test3 - {"id":"661aaab5-a14c-b63d-7e6d-c3329de2866a","timestamp":1419027445133,"average":17.0} +2015-02-16 21:15:34,530 1.1.0.RELEASE INFO pool-13-thread-1 sink.toptweets - {"id":"49ae2d6c-7404-cb90-351a-80234b8d5b21","timestamp":1424139334512,"topTags":{"SNL40":18,"NBAAllStarNYC":4,"SpringXD":4}} ``` diff --git a/reactor-top-tags/pom.xml b/reactor-top-tags/pom.xml index 8491c96..50d311a 100644 --- a/reactor-top-tags/pom.xml +++ b/reactor-top-tags/pom.xml @@ -64,7 +64,7 @@ io.projectreactor reactor-core - 2.0.0.BUILD-SNAPSHOT + 2.0.0.RC1 diff --git a/reactor-top-tags/src/main/java/com/acme/TopTags.java b/reactor-top-tags/src/main/java/com/acme/TopTags.java index f3e4801..88dc4b4 100644 --- a/reactor-top-tags/src/main/java/com/acme/TopTags.java +++ b/reactor-top-tags/src/main/java/com/acme/TopTags.java @@ -17,9 +17,11 @@ import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; + import org.springframework.util.StringUtils; import org.springframework.xd.reactor.Processor; import org.springframework.xd.tuple.Tuple; + import reactor.fn.Predicate; import reactor.rx.BiStreams; import reactor.rx.Stream; @@ -28,36 +30,47 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.springframework.xd.tuple.TupleBuilder.tuple; +import java.util.LinkedHashMap; + /** * @author Mark Pollack + * @author Marius Bogoevici */ public class TopTags implements Processor { - private int timeWindow; + private long timeWindow; - private int topN; + private long timeShift; + private int topN; - public TopTags(int timeWindow, int topN) { - this.timeWindow = timeWindow; - this.topN = topN; - } - @Override - public Stream process(Stream stream) { + public TopTags(long timeWindow, long timeShift, int topN) { + this.timeWindow = timeWindow; + this.timeShift = timeShift; + this.topN = topN; + } + @Override + public Stream process(Stream stream) { - return stream.flatMap(tweet -> { - JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text"); - return Streams.from(array.toArray(new String[array.size()])); - }) - .map(w -> reactor.fn.tuple.Tuple.of(w, 1)) - .window(timeWindow, SECONDS) - .flatMap(s -> BiStreams.reduceByKey(s, (acc, next) -> acc + next) - .sort((a, b) -> -a.t2.compareTo(b.t2)) - .take(topN)) - .map(entry -> tuple().of("hashtag", entry.t1, "count", entry.t2)); + return stream.flatMap(tweet -> { + JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text"); + return Streams.from(array.toArray(new String[array.size()])); + }) - } + .map(w -> reactor.fn.tuple.Tuple.of(w, 1)) + .window(timeWindow, timeShift, SECONDS) + .map(s -> BiStreams.reduceByKey(s, (acc, next) -> acc + next) + .sort((a, b) -> -a.t2.compareTo(b.t2)) + .take(topN)) + .flatMap(s -> s.reduce(new LinkedHashMap<>(), + (m, v) -> { + m.put(v.t1, v.t2); + return m; + } + )) + .map(m -> tuple().of("topTags", m)); + } } diff --git a/reactor-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java b/reactor-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java index a23f775..ce72bf2 100644 --- a/reactor-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java +++ b/reactor-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java @@ -24,25 +24,36 @@ */ public class TopTagsOptionsMetadata { - private int timeWindow = 1; + private int timeWindow = 1; - private int topN = 10; + private int timeShift = 1; - public int getTopN() { - return topN; - } + private int topN = 10; - @ModuleOption("The number of entires to include in the top N listing") - public void setTopN(int topN) { - this.topN = topN; - } + public int getTopN() { + return topN; + } - public int getTimeWindow() { - return timeWindow; - } + @ModuleOption("The number of entires to include in the top N listing") + public void setTopN(int topN) { + this.topN = topN; + } - @ModuleOption("The length in seconds of the time window") - public void setTimeWindow(int timeWindow) { - this.timeWindow = timeWindow; - } + public int getTimeWindow() { + return timeWindow; + } + + @ModuleOption("The length in seconds of the time window over which the top N tags are calculated") + public void setTimeWindow(int timeWindow) { + this.timeWindow = timeWindow; + } + + public int getTimeShift() { + return timeShift; + } + + @ModuleOption("The frequency in seconds with which the top N tags are calculated") + public void setTimeShift(int timeShift) { + this.timeShift = timeShift; + } } diff --git a/reactor-top-tags/src/main/resources/config/top-tags.xml b/reactor-top-tags/src/main/resources/config/top-tags.xml index f794914..86a371f 100644 --- a/reactor-top-tags/src/main/resources/config/top-tags.xml +++ b/reactor-top-tags/src/main/resources/config/top-tags.xml @@ -1,29 +1,30 @@ - - - - + + + + + - + - + - - - + + + - + - + diff --git a/reactor-top-tags/src/test/java/com/acme/TopTagsTupleTest.java b/reactor-top-tags/src/test/java/com/acme/TopTagsTupleTest.java index c3a66b7..c0cea34 100644 --- a/reactor-top-tags/src/test/java/com/acme/TopTagsTupleTest.java +++ b/reactor-top-tags/src/test/java/com/acme/TopTagsTupleTest.java @@ -18,8 +18,10 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; + import org.springframework.core.io.ClassPathResource; import org.springframework.xd.reactor.Processor; + import reactor.Environment; import reactor.fn.Consumer; import reactor.rx.Stream; @@ -35,55 +37,56 @@ public class TopTagsTupleTest { - protected Environment env; + protected Environment env; - @Before - public void loadEnv() { - env = Environment.initializeIfEmpty().assignErrorJournal(); - } + @Before + public void loadEnv() { + env = Environment.initializeIfEmpty().assignErrorJournal(); + } - @After - public void closeEnv() { - Environment.terminate(); - } + @After + public void closeEnv() { + Environment.terminate(); + } - @Test - public void tags() throws IOException { + @Test + public void tags() throws IOException { - final Broadcaster broadcaster = SerializedBroadcaster.create(); + final Broadcaster broadcaster = SerializedBroadcaster.create(); - Processor processor = new TopTags(1,10); - Stream outputStream = processor.process(broadcaster); + Processor processor = new TopTags(1, 1, 10); + Stream outputStream = processor.process(broadcaster); - outputStream.consume(new Consumer() { - @Override - public void accept(Object o) { - System.out.println("processed : " + o); - } - //TODO - expect + outputStream.consume(new Consumer() { + @Override + public void accept(Object o) { + System.out.println("processed : " + o); + } + //TODO - expect // processed : {"id":"55786760-7472-065d-8e62-eb83260948a4","timestamp":1422399628134,"hashtag":"AndroidGames","count":1} // processed : {"id":"bd99050f-abfa-a239-c09a-f2fe721daafb","timestamp":1422399628182,"hashtag":"Android","count":1} // processed : {"id":"10ce993c-fd57-322d-efa1-16f810918187","timestamp":1422399628184,"hashtag":"GameInsight","count":1} - }); - - ClassPathResource resource = new ClassPathResource("tweets.json"); - Scanner scanner = new Scanner(resource.getInputStream()); - while (scanner.hasNext()) { - String tweet = scanner.nextLine(); - broadcaster.onNext(tweet); - //simulateLatency(); - } - //System.in.read(); - - } - - private void simulateLatency(){ - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + }); + + ClassPathResource resource = new ClassPathResource("tweets.json"); + Scanner scanner = new Scanner(resource.getInputStream()); + while (scanner.hasNext()) { + String tweet = scanner.nextLine(); + broadcaster.onNext(tweet); + //simulateLatency(); + } + //System.in.read(); + + } + + private void simulateLatency() { + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } }