Skip to content

Component Implementation

Julien Cruz edited this page Jan 19, 2015 · 9 revisions

The StreamFlow delegation architecture makes integrating your existing Spouts and Bolts easy. The delegation architecture allows you to develop your Spouts and Bolts using the same native Storm libraries you already use making the transition of your current Storm components over to StreamFlow a relatively simple process. Although StreamFlow does not require you to import a StreamFlow specific API, there are a few simple libraries that can enhance your experience working with StreamFlow.

Google Guice Support

The first optional library is Google Guice, a dependency injection framework, which StreamFlow uses internally for injection of component properties. As you will see in the examples, the Guice @Inject annotation allow you to inject the user provided component properties into your Spouts and Bolts. To include support for Guice in your framework project, add the following dependency to your pom.xml.

<dependency>
    <groupId>com.google.inject</groupId>
    <artifactId>guice</artifactId>
    <version></version>
    <scope>provided</scope>
</dependency>

Once the above dependency is added to the pom.xml, the @Inject annotations in your component implementation will be available.

Note: The Guice dependency injection is optional as all StreamFlow properties are also added to the Map config object passed to the prepare() and initialize() methods

This section will walk through a component implementation using a sample framework.yml and the associated component source code as reference

Sample framework.yml
name: sample-framework
label: Sample Framework
version: 1.0.0-SNAPSHOT
components:
    - name: content-processor-bolt
      label: Content Processor Bolt
      type: storm-bolt
      description: Processes content for input tuple
      mainClass: streamflow.processing.bolt.ContentProcessorBolt
      properties:
          - name: processing-url
            label: Processing URL
            description: URL of the target Wisdom processing service
            defaultValue: http://162.16.137.201:8080/processing/ws/
          - name: processing-pipeline
            label: Processing Pipeline
            description: Pipeline to use for processing the data
            defaultValue: default
      inputs:
          - key: default
            description: Unprocessed activity content
      outputs:
          - key: default
            description: Processed activity content
Sample Storm Bolt implementation
1.  import javax.inject.Inject;
2.  import javax.inject.Named;
3. 
4.  public class ContentProcessorBolt extends BaseRichBolt {
5.   
6.     // Proxy Host and Port are injected automatically if available
7.     private String proxyHost;
8.     private int proxyPort;
9. 
10.    // Values entered here will be used as default if specified
11.    private String processingUrl = "http://localhost:8080/";
12.    private String processingPipeline;
13.
14.    @Inject
15.    public void setProxyHost(@Named("http.proxy.host") String proxyHost) {
16.        this.proxyHost = proxyHost;
17.    }
18.    
19.    @Inject
20.    public void setProxyPort(@Named("http.proxy.port") int proxyPort) {
21.        this.proxyPort = proxyPort;
22.    }
23.    
24.    @Inject
25.    public void setProcessingUrl(@Named("processing-url") String processingUrl) {
26.        this.processingUrl = processingUrl;
27.    }
28.
29.    @Inject
30.    public void setProcessingPipeline(@Named("processing-pipeline") String processingPipeline) {
31.        this.processingPipeline = processingPipeline;
32.    }
33.
34.    @Override
35.    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
36.        // IMPLEMENTATION OF PREPARE() GOES HERE...
37.    }
38.
39.    @Override
40.    public void execute(Tuple tuple) {
41.        // IMPLEMENTATION OF EXECUTE() GOES HERE...
42.    }
43.    
44.    @Override
45.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
46.        // IMPLEMENTATION OF DECLAREOUTPUTFIELDS() GOES HERE...
47.    }
48. }

In the above example, the framework.yml file defines two properties: "processing-url" and "processing-pipeline". These properties were added to the config as they were determined to be values that could be changed at runtime.
Typically properties are added to allow users to make modifications to the behavior of the Spout or Bolt without needing to change internal constants and recompile the code. Line 11 of the sample framework.json file references a mainClass for the implementation of the Bolt. The class above shows the concrete implementation of one of the Storm Bolt base classes along with the injected properties.

Lines 25 and 30 of the sample Bolt implementation shows the use of the @Named annotation to inject the values of the "processing-url" and "processing-pipeline" properties during runtime. When a topology using this Bolt is deployed, the instance variables will be populated with the current values after the constructor is called, but before any calls the inherited Bolt methods (e.g. prepare(), execute()).

Lines 15 and 20 demonstrate the use of two special @Named injected properties: "http.proxy.host" and "http.proxy.port". As the names imply, these properties are populated with the configured HTTP proxy host and port values during runtime if specified in the Storm environment. If no proxy is configured in the environment, the proxyHost and proxyPort will have a value null and -1 respectively. The proxy host and proxy port were added as special properties as they were used frequently by many different plugins and is centrally configured in the jetstream.yml configuration file.

This example demonstrates how values from the dynamically built jobs in the JetStream UI can be provided to the Spouts and Bolts during runtime. This feature is critical in allowing topologies to be built dynamically in the JetStream UI while allowing configurations to be modified and redeployed to the cluster without modifying the original Spout or Bolt code.