From 4131419c4045b81a87bcfa7112c38cc95a69ae5f Mon Sep 17 00:00:00 2001 From: Joseph Glanville Date: Sun, 29 Sep 2013 21:26:48 -0700 Subject: [PATCH 1/2] Initial implementation of Kyotocabinet backed Log --- .travis.yml | 5 +- examples/distributed_hash_kyoto.rb | 40 +++++++++++++ floss.gemspec | 1 + lib/floss/log/kyoto.rb | 96 ++++++++++++++++++++++++++++++ lib/floss/node.rb | 2 +- spec/functional/log_spec.rb | 5 ++ 6 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 examples/distributed_hash_kyoto.rb create mode 100644 lib/floss/log/kyoto.rb diff --git a/.travis.yml b/.travis.yml index 47d5054..f0944da 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,6 @@ -before_install: sudo apt-get install libzmq3-dev +before_install: + - sudo add-apt-repository ppa:adrian-wilkins/kyoto-cabinet -y + - sudo apt-get update -y + - sudo apt-get install libkyotocabinet-dev libkyotocabinet16 libzmq3-dev -y script: bundle exec rake language: ruby diff --git a/examples/distributed_hash_kyoto.rb b/examples/distributed_hash_kyoto.rb new file mode 100644 index 0000000..df2f7dd --- /dev/null +++ b/examples/distributed_hash_kyoto.rb @@ -0,0 +1,40 @@ +$: << File.expand_path('../../lib', __FILE__) + +require 'floss/test_helper' +require 'floss/proxy' +require 'floss/log/simple' +require 'floss/log/kyoto' + +include Celluloid::Logger + +class FSM + def initialize + @content = Hash.new + end + + def set(key, value) + @content[key] = value + end + + def get(key) + @content[key] + end +end + +CLUSTER_SIZE = 3 + +ids = CLUSTER_SIZE.times.map do |i| + port = 50000 + i + "tcp://127.0.0.1:#{port}" +end + +proxies = Floss::TestHelper.cluster(ids) do |id, peers| + db = id.split(':').last.to_i - 50000 + path = "/tmp/floss/#{db}.kch" + Floss::Proxy.new(FSM.new, id: id, peers: peers, log: Floss::Log::Kyoto, kyoto_db: path) +end + +100.times do |i| + proxies.sample.set(:foo, i) + raise "fail" unless proxies.sample.get(:foo) == i +end diff --git a/floss.gemspec b/floss.gemspec index a98ddf4..13cf3d0 100644 --- a/floss.gemspec +++ b/floss.gemspec @@ -20,4 +20,5 @@ Gem::Specification.new do |gem| gem.add_development_dependency 'rake' gem.add_runtime_dependency 'celluloid-zmq' gem.add_runtime_dependency 'celluloid-io' + gem.add_runtime_dependency 'kyotocabinet-ruby' end diff --git a/lib/floss/log/kyoto.rb b/lib/floss/log/kyoto.rb new file mode 100644 index 0000000..23b39d4 --- /dev/null +++ b/lib/floss/log/kyoto.rb @@ -0,0 +1,96 @@ +require 'forwardable' +require 'floss' +require 'floss/log' +require 'celluloid/io' +require 'kyotocabinet' + +# See Section 5.3. +class Floss::Log + class Kyoto < Floss::Log + include Celluloid + include Celluloid::Logger + extend Forwardable + + DEFAULT_OPTIONS = { + kyoto_db: '/tmp/floss.kch' + } + + finalizer :finalize + + def initialize(options={}) + @options = DEFAULT_OPTIONS.merge(options) + @db = KyotoCabinet::DB.new + @db.open(@options[:kyoto_db]) + info "Openned log at #{@options[:kyoto_db]}" + end + + # @param [Array] The entries to append to the log. + def append(new_entries) + raise ArgumentError, 'The passed array is empty.' if new_entries.empty? + cur_idx = @db.count + new_entries.each_with_index do |e, idx| + @db[(cur_idx + idx).to_s] = Marshal.dump(e) + end + last_index + end + + def empty? + @db.count == 0 + end + + def [](index) + v = @db[index.to_s] + return nil if v.nil? + Marshal.load(v) + end + + def []=(index,v) + @db[index.to_s] = Marshal.dump(v) + end + + def starting_with(index) + cur = @db.cursor + cur.jump(index.to_s) + records = [] + while rec = cur.get(true) + records << Marshal.load(rec[1]) + end + records + end + + # Returns the last index in the log or nil if the log is empty. + def last_index + len = @db.count + return nil if len == 0 + len > 0 ? len - 1 : nil + end + + # Returns the term of the last entry in the log or nil if the log is empty. + def last_term + return nil if empty? + e = @db[last_index.to_s] + return nil if e.nil? + entry = Marshal.load(e) + entry ? entry.term : nil + end + + def validate(index, term) + # Special case: Accept the first entry if the log is empty. + return empty? if index.nil? && term.nil? + e = @db[index] + return false if e.nil? + entry = Marshal.load(e) + entry && entry.term == term + end + + def remove_starting_with(index) + (index..@db.count).each do |idx| + @db.remove(idx.to_s) + end + end + + def finalize + @db.close + end + end +end diff --git a/lib/floss/node.rb b/lib/floss/node.rb index ba1d11f..7a71ebf 100644 --- a/lib/floss/node.rb +++ b/lib/floss/node.rb @@ -69,7 +69,7 @@ def run raise 'Already running' if @running @running = true - @log = @options[:log].new + @log = @options[:log].new @options self.server = link(rpc_server_class.new(id, &method(:handle_rpc))) @election_timeout = after(random_timeout) { on_election_timeout } diff --git a/spec/functional/log_spec.rb b/spec/functional/log_spec.rb index 35f6174..210cd1e 100644 --- a/spec/functional/log_spec.rb +++ b/spec/functional/log_spec.rb @@ -1,5 +1,6 @@ require 'floss/log' require 'floss/log/simple' +require 'floss/log/kyoto' shared_examples 'a Log implementation' do @@ -57,3 +58,7 @@ describe Floss::Log::Simple do it_should_behave_like 'a Log implementation' end + +describe Floss::Log::Kyoto do + it_should_behave_like 'a Log implementation' +end From 9b34332483c6fdbc439a5f64db9161b6b7612cfa Mon Sep 17 00:00:00 2001 From: Joseph Glanville Date: Mon, 7 Oct 2013 04:26:55 -0700 Subject: [PATCH 2/2] Don't pull celluloid directly from git, use stable versions --- Gemfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Gemfile b/Gemfile index 72d9b8e..efed804 100644 --- a/Gemfile +++ b/Gemfile @@ -3,9 +3,9 @@ source 'https://rubygems.org' # Specify your gem's dependencies in floss.gemspec gemspec -gem 'celluloid', github: 'celluloid/celluloid' -gem 'celluloid-io', github: 'celluloid/celluloid-io' -gem 'celluloid-zmq', github: 'celluloid/celluloid-zmq' +gem 'celluloid' +gem 'celluloid-io' +gem 'celluloid-zmq' group :docs do gem 'yard'