Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Initial implementation of Kyotocabinet backed Log #24

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
before_install: sudo apt-get install libzmq3-dev
before_install:
- sudo add-apt-repository ppa:adrian-wilkins/kyoto-cabinet -y
- sudo apt-get update -y
- sudo apt-get install libkyotocabinet-dev libkyotocabinet16 libzmq3-dev -y
script: bundle exec rake
language: ruby
6 changes: 3 additions & 3 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in floss.gemspec
gemspec

gem 'celluloid', github: 'celluloid/celluloid'
gem 'celluloid-io', github: 'celluloid/celluloid-io'
gem 'celluloid-zmq', github: 'celluloid/celluloid-zmq'
gem 'celluloid'
gem 'celluloid-io'
gem 'celluloid-zmq'

group :docs do
gem 'yard'
Expand Down
40 changes: 40 additions & 0 deletions examples/distributed_hash_kyoto.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
$: << File.expand_path('../../lib', __FILE__)

require 'floss/test_helper'
require 'floss/proxy'
require 'floss/log/simple'
require 'floss/log/kyoto'

include Celluloid::Logger

class FSM
def initialize
@content = Hash.new
end

def set(key, value)
@content[key] = value
end

def get(key)
@content[key]
end
end

CLUSTER_SIZE = 3

ids = CLUSTER_SIZE.times.map do |i|
port = 50000 + i
"tcp://127.0.0.1:#{port}"
end

proxies = Floss::TestHelper.cluster(ids) do |id, peers|
db = id.split(':').last.to_i - 50000
path = "/tmp/floss/#{db}.kch"
Floss::Proxy.new(FSM.new, id: id, peers: peers, log: Floss::Log::Kyoto, kyoto_db: path)
end

100.times do |i|
proxies.sample.set(:foo, i)
raise "fail" unless proxies.sample.get(:foo) == i
end
1 change: 1 addition & 0 deletions floss.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ Gem::Specification.new do |gem|
gem.add_development_dependency 'rake'
gem.add_runtime_dependency 'celluloid-zmq'
gem.add_runtime_dependency 'celluloid-io'
gem.add_runtime_dependency 'kyotocabinet-ruby'
end
96 changes: 96 additions & 0 deletions lib/floss/log/kyoto.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
require 'forwardable'
require 'floss'
require 'floss/log'
require 'celluloid/io'
require 'kyotocabinet'

# See Section 5.3.
class Floss::Log
class Kyoto < Floss::Log
include Celluloid
include Celluloid::Logger
extend Forwardable

DEFAULT_OPTIONS = {
kyoto_db: '/tmp/floss.kch'
}

finalizer :finalize

def initialize(options={})
@options = DEFAULT_OPTIONS.merge(options)
@db = KyotoCabinet::DB.new
@db.open(@options[:kyoto_db])
info "Openned log at #{@options[:kyoto_db]}"
end

# @param [Array] The entries to append to the log.
def append(new_entries)
raise ArgumentError, 'The passed array is empty.' if new_entries.empty?
cur_idx = @db.count
new_entries.each_with_index do |e, idx|
@db[(cur_idx + idx).to_s] = Marshal.dump(e)
end
last_index
end

def empty?
@db.count == 0
end

def [](index)
v = @db[index.to_s]
return nil if v.nil?
Marshal.load(v)
end

def []=(index,v)
@db[index.to_s] = Marshal.dump(v)
end

def starting_with(index)
cur = @db.cursor
cur.jump(index.to_s)
records = []
while rec = cur.get(true)
records << Marshal.load(rec[1])
end
records
end

# Returns the last index in the log or nil if the log is empty.
def last_index
len = @db.count
return nil if len == 0
len > 0 ? len - 1 : nil
end

# Returns the term of the last entry in the log or nil if the log is empty.
def last_term
return nil if empty?
e = @db[last_index.to_s]
return nil if e.nil?
entry = Marshal.load(e)
entry ? entry.term : nil
end

def validate(index, term)
# Special case: Accept the first entry if the log is empty.
return empty? if index.nil? && term.nil?
e = @db[index]
return false if e.nil?
entry = Marshal.load(e)
entry && entry.term == term
end

def remove_starting_with(index)
([email protected]).each do |idx|
@db.remove(idx.to_s)
end
end

def finalize
@db.close
end
end
end
2 changes: 1 addition & 1 deletion lib/floss/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run
raise 'Already running' if @running

@running = true
@log = @options[:log].new
@log = @options[:log].new @options

self.server = link(rpc_server_class.new(id, &method(:handle_rpc)))
@election_timeout = after(random_timeout) { on_election_timeout }
Expand Down
5 changes: 5 additions & 0 deletions spec/functional/log_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'floss/log'
require 'floss/log/simple'
require 'floss/log/kyoto'

shared_examples 'a Log implementation' do

Expand Down Expand Up @@ -57,3 +58,7 @@
describe Floss::Log::Simple do
it_should_behave_like 'a Log implementation'
end

describe Floss::Log::Kyoto do
it_should_behave_like 'a Log implementation'
end