Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

[Sprint: 49] Update Reactor example: #18

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions reactor-top-tags/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ In order to install the module run it in your Spring XD installation, you will n

## Code Tour

The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java). This uses the Stream API to perform an average over the last 5 values of data. The [Tuple](http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic container for keyed data.
The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java).
This uses the Stream API to calculate the most referenced tags in a given time window. The[Tuple]
(http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic
container for keyed data.



## Building
Expand Down Expand Up @@ -42,20 +46,11 @@ xd:>
Now create an deploy a stream:

```
xd:>stream create reactor --definition "http | reactor-top-tags --inputType=application/x-xd-tuple | log" --deploy
```

To post several messages, use the script file generateData.script located in this repository.

```
xd:>script --file [path-to]/generateData.script
xd:>stream create reactor --definition "tweetstream | reactor-top-tags | log" --deploy
```

This will post JSON data such as `{"id":"1","measurement":"10"}` with increasing valuespwd for the measurement. The use of the inputType option (all modules have this option) instructs XD to convert the JSON string to an XD Tuple object before invoking the process method.

You should see the stream output in the Spring XD log:

```
17:17:25,064 1.1.0.SNAP INFO pool-10-thread-12 sink.test3 - {"id":"d5c9617b-4bac-3786-6559-6b0ab221496c","timestamp":1419027445042,"average":12.0}
17:17:25,137 1.1.0.SNAP INFO pool-10-thread-6 sink.test3 - {"id":"661aaab5-a14c-b63d-7e6d-c3329de2866a","timestamp":1419027445133,"average":17.0}
2015-02-16 21:15:34,530 1.1.0.RELEASE INFO pool-13-thread-1 sink.toptweets - {"id":"49ae2d6c-7404-cb90-351a-80234b8d5b21","timestamp":1424139334512,"topTags":{"SNL40":18,"NBAAllStarNYC":4,"SpringXD":4}}
```
2 changes: 1 addition & 1 deletion reactor-top-tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.RC1</version>
</dependency>


Expand Down
51 changes: 32 additions & 19 deletions reactor-top-tags/src/main/java/com/acme/TopTags.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;

import org.springframework.util.StringUtils;
import org.springframework.xd.reactor.Processor;
import org.springframework.xd.tuple.Tuple;

import reactor.fn.Predicate;
import reactor.rx.BiStreams;
import reactor.rx.Stream;
Expand All @@ -28,36 +30,47 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.springframework.xd.tuple.TupleBuilder.tuple;

import java.util.LinkedHashMap;

/**
* @author Mark Pollack
* @author Marius Bogoevici
*/
public class TopTags implements Processor<String, Tuple> {

private int timeWindow;
private long timeWindow;

private int topN;
private long timeShift;

private int topN;

public TopTags(int timeWindow, int topN) {
this.timeWindow = timeWindow;
this.topN = topN;
}

@Override
public Stream<Tuple> process(Stream<String> stream) {
public TopTags(long timeWindow, long timeShift, int topN) {
this.timeWindow = timeWindow;
this.timeShift = timeShift;
this.topN = topN;
}

@Override
public Stream<Tuple> process(Stream<String> stream) {

return stream.flatMap(tweet -> {
JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text");
return Streams.from(array.toArray(new String[array.size()]));
})

.map(w -> reactor.fn.tuple.Tuple.of(w, 1))
.window(timeWindow, SECONDS)
.flatMap(s -> BiStreams.reduceByKey(s, (acc, next) -> acc + next)
.sort((a, b) -> -a.t2.compareTo(b.t2))
.take(topN))
.map(entry -> tuple().of("hashtag", entry.t1, "count", entry.t2));
return stream.flatMap(tweet -> {
JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text");
return Streams.from(array.toArray(new String[array.size()]));
})

}
.map(w -> reactor.fn.tuple.Tuple.of(w, 1))
.window(timeWindow, timeShift, SECONDS)
.map(s -> BiStreams.reduceByKey(s, (acc, next) -> acc + next)
.sort((a, b) -> -a.t2.compareTo(b.t2))
.take(topN))
.flatMap(s -> s.reduce(new LinkedHashMap<>(),
(m, v) -> {
m.put(v.t1, v.t2);
return m;
}
))
.map(m -> tuple().of("topTags", m));
}
}
43 changes: 27 additions & 16 deletions reactor-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,36 @@
*/
public class TopTagsOptionsMetadata {

private int timeWindow = 1;
private int timeWindow = 1;

private int topN = 10;
private int timeShift = 1;

public int getTopN() {
return topN;
}
private int topN = 10;

@ModuleOption("The number of entires to include in the top N listing")
public void setTopN(int topN) {
this.topN = topN;
}
public int getTopN() {
return topN;
}

public int getTimeWindow() {
return timeWindow;
}
@ModuleOption("The number of entires to include in the top N listing")
public void setTopN(int topN) {
this.topN = topN;
}

@ModuleOption("The length in seconds of the time window")
public void setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
}
public int getTimeWindow() {
return timeWindow;
}

@ModuleOption("The length in seconds of the time window over which the top N tags are calculated")
public void setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
}

public int getTimeShift() {
return timeShift;
}

@ModuleOption("The frequency in seconds with which the top N tags are calculated")
public void setTimeShift(int timeShift) {
this.timeShift = timeShift;
}
}
31 changes: 16 additions & 15 deletions reactor-top-tags/src/main/resources/config/top-tags.xml
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
xmlns:int="http://www.springframework.org/schema/integration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


<bean id="processor" class="com.acme.TopTags">
<constructor-arg index="0" value="${timeWindow}"/>
<constructor-arg index="1" value="${topN}"/>
</bean>
<bean id="processor" class="com.acme.TopTags">
<constructor-arg index="0" value="${timeWindow}"/>
<constructor-arg index="1" value="${timeShift}"/>
<constructor-arg index="2" value="${topN}"/>
</bean>


<!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->
<!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->

<int:channel id="input"/>
<int:channel id="input"/>

<bean name="messageHandler" class="org.springframework.xd.reactor.BroadcasterMessageHandler">
<constructor-arg ref="processor"/>
</bean>
<bean name="messageHandler" class="org.springframework.xd.reactor.BroadcasterMessageHandler">
<constructor-arg ref="processor"/>
</bean>


<int:service-activator input-channel="input" ref="messageHandler"
output-channel="output"/>
<int:service-activator input-channel="input" ref="messageHandler"
output-channel="output"/>

<int:channel id="output"/>
<int:channel id="output"/>

</beans>
83 changes: 43 additions & 40 deletions reactor-top-tags/src/test/java/com/acme/TopTagsTupleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.springframework.core.io.ClassPathResource;
import org.springframework.xd.reactor.Processor;

import reactor.Environment;
import reactor.fn.Consumer;
import reactor.rx.Stream;
Expand All @@ -35,55 +37,56 @@
public class TopTagsTupleTest {


protected Environment env;
protected Environment env;

@Before
public void loadEnv() {
env = Environment.initializeIfEmpty().assignErrorJournal();
}
@Before
public void loadEnv() {
env = Environment.initializeIfEmpty().assignErrorJournal();
}

@After
public void closeEnv() {
Environment.terminate();
}
@After
public void closeEnv() {
Environment.terminate();
}

@Test
public void tags() throws IOException {
@Test
public void tags() throws IOException {

final Broadcaster<Object> broadcaster = SerializedBroadcaster.create();
final Broadcaster<Object> broadcaster = SerializedBroadcaster.create();

Processor processor = new TopTags(1,10);
Stream<?> outputStream = processor.process(broadcaster);
Processor processor = new TopTags(1, 1, 10);
Stream<?> outputStream = processor.process(broadcaster);


outputStream.consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
System.out.println("processed : " + o);
}
//TODO - expect
outputStream.consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
System.out.println("processed : " + o);
}
//TODO - expect
// processed : {"id":"55786760-7472-065d-8e62-eb83260948a4","timestamp":1422399628134,"hashtag":"AndroidGames","count":1}
// processed : {"id":"bd99050f-abfa-a239-c09a-f2fe721daafb","timestamp":1422399628182,"hashtag":"Android","count":1}
// processed : {"id":"10ce993c-fd57-322d-efa1-16f810918187","timestamp":1422399628184,"hashtag":"GameInsight","count":1}
});

ClassPathResource resource = new ClassPathResource("tweets.json");
Scanner scanner = new Scanner(resource.getInputStream());
while (scanner.hasNext()) {
String tweet = scanner.nextLine();
broadcaster.onNext(tweet);
//simulateLatency();
}
//System.in.read();

}

private void simulateLatency(){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

ClassPathResource resource = new ClassPathResource("tweets.json");
Scanner scanner = new Scanner(resource.getInputStream());
while (scanner.hasNext()) {
String tweet = scanner.nextLine();
broadcaster.onNext(tweet);
//simulateLatency();
}
//System.in.read();

}

private void simulateLatency() {
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}

}