Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for IBatchSpout to redstorm #57

Closed
wants to merge 10 commits into from
59 changes: 59 additions & 0 deletions generator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
require 'erb'
require 'pry'

require 'java'
require 'active_support/core_ext'

Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) }


PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb")
PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb")

to_generate = ["storm.trident.operation.Function"]


# Return all java functions of a java class
def get_functions(jlass)
jlass.declared_instance_methods.concat( jlass.interfaces.map{|i| get_functions(i) }.flatten )
end

# Return all java deps of a class
def get_java_deps(functions, klass)
functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass
end

to_generate.each do |klass|
_functions = get_functions(Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class)

java_deps = get_java_deps(_functions, klass)


# Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
functions = _functions.reduce({}) do |memo, f|
before_serialization = %w{ }.include?(f.name.to_s)
memoize = %w{ prepare execute }.include?(f.name.to_s)
memo[:"#{f.name}"] = {
:return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void",
:args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)},
:before_serialization => before_serialization,
:memoize => memoize
}
memo
end

interface_name = klass.split(".")[-1]

# IBlah to ProxyBlah if IBlah
ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}"

java_class_name = "JRuby#{ruby_class_name}"

# Rubyify java functions into {:method_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
methods = functions.map do |f_name, params|
{f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }}
end.reduce({}){|m,o| m.merge(o)}

File.open("./lib/red_storm/proxy/#{ruby_class_name.underscore}.rb", 'w') {|f| f.write(ERB.new(PROXY_JRUBY_TEMPLATE).result(binding)) }
File.open("./src/main/redstorm/storm/jruby/#{java_class_name}.java", 'w') {|f| f.write(ERB.new(PROXY_JAVA_TEMPLATE).result(binding)) }
end
51 changes: 51 additions & 0 deletions java_proxy.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package redstorm.storm.jruby;
<% java_deps.each do |dep| %>
import <%= dep %>;<% end %>

public class <%= java_class_name %> implements <%= interface_name %> {
<%= interface_name %> _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;

public <%= java_class_name %>(final String baseClassPath, final String realClassName, final String[] fields) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
_fields = fields;
}

<% functions.each do |function_name, params| %>
@Override
public <%= params[:return_type] %> <%= function_name %>(<%= params[:args].map{|n,t| ["final #{t}", n].join(' ') }.flatten.join(', ') %>) {
<% if function_name == :open %>
_proxy = newProxy(_baseClassPath, _realClassName);
_proxy.open(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif function_name == :declareOutputFields %>
if (_fields.length > 0) {
<%= params[:args].values[0] %>.declare(new Fields(_fields));
} else {
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
}
<% elsif params[:before_serialization] %>
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif params[:memoize] %>
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% else %>
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% end %>
}
<% end %>

private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) {
try {
redstorm.proxy.<%= ruby_class_name %> proxy = new redstorm.proxy.<%= ruby_class_name %>(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
1 change: 1 addition & 0 deletions lib/red_storm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
require 'red_storm/simple_bolt'
require 'red_storm/simple_spout'
require 'red_storm/simple_topology'
require 'red_storm/simple_drpc_topology'
12 changes: 6 additions & 6 deletions lib/red_storm/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
TARGET_SRC_DIR = "#{TARGET_DIR}/src"
TARGET_GEM_DIR = "#{TARGET_DIR}/gems/gems"
TARGET_SPECS_DIR = "#{TARGET_DIR}/gems/specifications"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency"
TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked"
TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar"
Expand All @@ -26,9 +26,9 @@


module RedStorm
class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"

class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"

def self.local_storm_command(class_file, ruby_mode = nil)
src_dir = File.expand_path(File.dirname(class_file))
Expand All @@ -38,7 +38,7 @@ def self.local_storm_command(class_file, ruby_mode = nil)
def self.cluster_storm_command(class_file, ruby_mode = nil)
"storm jar #{TARGET_CLUSTER_JAR} -Djruby.compat.version=#{RedStorm.jruby_mode_token(ruby_mode)} redstorm.TopologyLauncher cluster #{class_file}"
end

def self.usage
puts("usage: redstorm version")
puts(" redstorm install")
Expand Down Expand Up @@ -82,4 +82,4 @@ def self.subshell(command)

end

end
end
71 changes: 71 additions & 0 deletions lib/red_storm/proxy/batch_spout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
require 'java'

java_import 'storm.trident.operation.TridentCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.spout.IBatchSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end

java_package 'redstorm.proxy'

# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the
# Java artifacts when creating a spout.
#
# The real batch spout class implementation must define these methods:
# - open(conf, context, collector)
# - emitBatch
# - getOutputFields
# - ack(batch_id)
#
# and optionnaly:
# - close
#

class BatchSpout
java_implements IBatchSpout

java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end

java_signature 'void open(Map, TopologyContext)'
def open(conf, context)
@real_spout.open(conf, context)
end

java_signature 'void close()'
def close
@real_spout.close if @real_spout.respond_to?(:close)
end

java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
end

java_signature 'void ack(long)'
def ack(batch_id)
@real_spout.ack(batch_id)
end

java_signature 'Fields getOutputFields()'
def getOutputFields
@real_spout.get_output_fields()
end

java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
end

end
48 changes: 48 additions & 0 deletions lib/red_storm/proxy/proxy_function.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require 'java'


java_import 'storm.trident.tuple.TridentTuple'

java_import 'storm.trident.operation.TridentCollector'

java_import 'java.util.Map'

java_import 'storm.trident.operation.TridentOperationContext'

java_import 'storm.trident.operation.Function'


module Backtype
java_import 'backtype.storm.Config'
end

java_package 'redstorm.proxy'

class ProxyFunction
java_implements Function

java_signature 'Function (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real = Object.module_eval(real_class_name).new
end

java_signature 'void execute(TridentTuple, TridentCollector)'
def execute(_trident_tuple, _trident_collector)
@real.execute(_trident_tuple, _trident_collector)
end

java_signature 'void cleanup()'
def cleanup()
@real.cleanup()
end

java_signature 'void prepare(Map, TridentOperationContext)'
def prepare(_map, _trident_operation_context)
@real.prepare(_map, _trident_operation_context)
end


end
87 changes: 87 additions & 0 deletions lib/red_storm/simple_drpc_topology.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'

module RedStorm

class InputBoltDefinition < SimpleTopology::BoltDefinition
attr_accessor :grouping

def initialize(*args)
super
@grouping = :none
end

def grouping(grouping)
@grouping = @grouping
end

def define_grouping(declarer)

case @grouping
when :fields
declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping()
when :shuffle
declarer.shuffleGrouping()
when :local_or_shuffle
declarer.localOrShuffleGrouping()
when :none
declarer.noneGrouping()
when :all
declarer.allGrouping()
when :direct
declarer.directGrouping()
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end

class SimpleDRPCTopology < SimpleTopology

def self.spout
raise TopologyDefinitionError, "DRPC spout is already defined"
end

def start(base_class_path, env)
builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)

self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end

# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}

configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)

drpc = nil
if env == :local
drpc = LocalDRPC.new
submitter = @cluster = LocalCluster.new
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
else
submitter = StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
end
instance_exec(env, drpc, &self.class.submit_block)
end

def self.input_bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)

bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
end

end
8 changes: 4 additions & 4 deletions lib/red_storm/simple_topology.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def is_java?
end

class SpoutDefinition < ComponentDefinition

# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
Expand All @@ -49,7 +49,7 @@ def new_instance(base_class_path)
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end

class BoltDefinition < ComponentDefinition
attr_accessor :sources, :command

Expand Down Expand Up @@ -119,7 +119,7 @@ def self.spout(spout_class, *args, &spout_block)
def self.bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt_options = {:id => options[:id] ? options[:id] : self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)

bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
Expand Down Expand Up @@ -158,7 +158,7 @@ def start(base_class_path, env)

configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)

submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
instance_exec(env, &self.class.submit_block)
Expand Down
Loading