Skip to content

Commit

Permalink
Rename SharedMemoryObject to SysVSharedMemory and made it a mixin mod…
Browse files Browse the repository at this point in the history
…ule, remove "Atomic" naming, change execute_atomically to synchronize
  • Loading branch information
kyewei committed Nov 12, 2015
1 parent 2e9cd7a commit 4896c3c
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 74 deletions.
51 changes: 27 additions & 24 deletions ext/semian/semian_atomic_integer.c → ext/semian/semian_integer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

typedef struct {
int value;
} semian_atomic_int;
} semian_int;

static void semian_atomic_integer_bind_init_fn (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size, int prev_mem_attach_count);
static VALUE semian_atomic_integer_bind_init_fn_wrapper(VALUE self);
static VALUE semian_atomic_integer_get_value(VALUE self);
static VALUE semian_atomic_integer_set_value(VALUE self, VALUE num);
static VALUE semian_atomic_integer_increment(int argc, VALUE *argv, VALUE self);
static void semian_integer_bind_init_fn (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size, int prev_mem_attach_count);
static VALUE semian_integer_bind_init_fn_wrapper(VALUE self);
static VALUE semian_integer_get_value(VALUE self);
static VALUE semian_integer_set_value(VALUE self, VALUE num);
static VALUE semian_integer_increment(int argc, VALUE *argv, VALUE self);

static void
semian_atomic_integer_bind_init_fn (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size, int prev_mem_attach_count)
semian_integer_bind_init_fn (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size, int prev_mem_attach_count)
{
semian_atomic_int *ptr = dest;
semian_atomic_int *old = prev_data;
semian_int *ptr = dest;
semian_int *old = prev_data;
if (prev_mem_attach_count){
if (prev_data){
ptr->value = old->value;
Expand All @@ -25,16 +25,16 @@ semian_atomic_integer_bind_init_fn (size_t byte_size, void *dest, void *prev_dat
}

static VALUE
semian_atomic_integer_bind_init_fn_wrapper(VALUE self)
semian_integer_bind_init_fn_wrapper(VALUE self)
{
semian_shm_object *ptr;
TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr);
ptr->object_init_fn = &semian_atomic_integer_bind_init_fn;
ptr->object_init_fn = &semian_integer_bind_init_fn;
return self;
}

static VALUE
semian_atomic_integer_get_value(VALUE self)
semian_integer_get_value(VALUE self)
{
semian_shm_object *ptr;
TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr);
Expand All @@ -47,13 +47,13 @@ semian_atomic_integer_get_value(VALUE self)
if (!semian_shm_object_lock(self))
return Qnil;

int value = ((semian_atomic_int *)(ptr->shm_address))->value;
int value = ((semian_int *)(ptr->shm_address))->value;
semian_shm_object_unlock(self);
return INT2NUM(value);
}

static VALUE
semian_atomic_integer_set_value(VALUE self, VALUE num)
semian_integer_set_value(VALUE self, VALUE num)
{
semian_shm_object *ptr;
TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr);
Expand All @@ -67,14 +67,14 @@ semian_atomic_integer_set_value(VALUE self, VALUE num)
if (!semian_shm_object_lock(self))
return Qnil;

((semian_atomic_int *)(ptr->shm_address))->value = NUM2INT(num);
((semian_int *)(ptr->shm_address))->value = NUM2INT(num);

semian_shm_object_unlock(self);
return num;
}

static VALUE
semian_atomic_integer_increment(int argc, VALUE *argv, VALUE self)
semian_integer_increment(int argc, VALUE *argv, VALUE self)
{
VALUE num;
rb_scan_args(argc, argv, "01", &num);
Expand All @@ -93,22 +93,25 @@ semian_atomic_integer_increment(int argc, VALUE *argv, VALUE self)
if (!semian_shm_object_lock(self))
return Qnil;

((semian_atomic_int *)(ptr->shm_address))->value += NUM2INT(num);
((semian_int *)(ptr->shm_address))->value += NUM2INT(num);

semian_shm_object_unlock(self);
return self;
}

void
Init_semian_atomic_integer (void)
Init_semian_integer (void)
{
// Bind methods to AtomicInteger
// Bind methods to Integer
VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian"));
VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory"));
VALUE cSysVModule = rb_const_get(cSemianModule, rb_intern("SysV"));
VALUE cAtomicInteger = rb_const_get(cSysVModule, rb_intern("Integer"));
VALUE cInteger = rb_const_get(cSysVModule, rb_intern("Integer"));

rb_define_method(cAtomicInteger, "bind_init_fn", semian_atomic_integer_bind_init_fn_wrapper, 0);
rb_define_method(cAtomicInteger, "value", semian_atomic_integer_get_value, 0);
rb_define_method(cAtomicInteger, "value=", semian_atomic_integer_set_value, 1);
rb_define_method(cAtomicInteger, "increment", semian_atomic_integer_increment, -1);
semian_shm_object_replace_alloc(cSysVSharedMemory, cInteger);

rb_define_method(cInteger, "bind_init_fn", semian_integer_bind_init_fn_wrapper, 0);
rb_define_method(cInteger, "value", semian_integer_get_value, 0);
rb_define_method(cInteger, "value=", semian_integer_set_value, 1);
rb_define_method(cInteger, "increment", semian_integer_increment, -1);
}
37 changes: 22 additions & 15 deletions ext/semian/semian_shared_memory_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ semian_shm_object_alloc(VALUE klass)
* Implementations
*/

VALUE
semian_shm_object_replace_alloc(VALUE klass, VALUE target)
{
rb_define_alloc_func(target, semian_shm_object_alloc);
return target;
}

VALUE
semian_shm_object_sizeof(VALUE klass, VALUE type)
Expand Down Expand Up @@ -576,38 +582,39 @@ semian_shm_object_byte_size(VALUE self)
}

static VALUE
semian_shm_object_execute_atomically_with_block(VALUE self)
semian_shm_object_synchronize_with_block(VALUE self)
{
if (!rb_block_given_p())
rb_raise(rb_eArgError, "Expected block");
return rb_yield(Qnil);
}

static VALUE
semian_shm_object_execute_atomically(VALUE self) {
semian_shm_object_synchronize(VALUE self) {
if (!semian_shm_object_lock(self))
return Qnil;
return rb_ensure(semian_shm_object_execute_atomically_with_block, self, semian_shm_object_unlock_all, self);
return rb_ensure(semian_shm_object_synchronize_with_block, self, semian_shm_object_unlock_all, self);
}

void
Init_semian_shm_object (void) {

VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian"));
VALUE cSharedMemoryObject = rb_const_get(cSemianModule, rb_intern("SharedMemoryObject"));
VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory"));

rb_define_alloc_func(cSharedMemoryObject, semian_shm_object_alloc);
rb_define_method(cSharedMemoryObject, "_acquire", semian_shm_object_acquire, 3);
rb_define_method(cSharedMemoryObject, "_destroy", semian_shm_object_destroy, 0);
rb_define_method(cSharedMemoryObject, "lock", semian_shm_object_lock, 0);
rb_define_method(cSharedMemoryObject, "unlock", semian_shm_object_unlock, 0);
rb_define_method(cSharedMemoryObject, "byte_size", semian_shm_object_byte_size, 0);
//rb_define_alloc_func(cSysVSharedMemory, semian_shm_object_alloc);
rb_define_method(cSysVSharedMemory, "_acquire", semian_shm_object_acquire, 3);
rb_define_method(cSysVSharedMemory, "_destroy", semian_shm_object_destroy, 0);
rb_define_method(cSysVSharedMemory, "lock", semian_shm_object_lock, 0);
rb_define_method(cSysVSharedMemory, "unlock", semian_shm_object_unlock, 0);
rb_define_method(cSysVSharedMemory, "byte_size", semian_shm_object_byte_size, 0);

rb_define_method(cSharedMemoryObject, "semid", semian_shm_object_semid, 0);
rb_define_method(cSharedMemoryObject, "shmid", semian_shm_object_shmid, 0);
rb_define_method(cSharedMemoryObject, "_execute_atomically", semian_shm_object_execute_atomically, 0);
rb_define_method(cSysVSharedMemory, "semid", semian_shm_object_semid, 0);
rb_define_method(cSysVSharedMemory, "shmid", semian_shm_object_shmid, 0);
rb_define_method(cSysVSharedMemory, "_synchronize", semian_shm_object_synchronize, 0);

rb_define_singleton_method(cSharedMemoryObject, "_sizeof", semian_shm_object_sizeof, 1);
rb_define_singleton_method(cSysVSharedMemory, "_sizeof", semian_shm_object_sizeof, 1);
rb_define_singleton_method(cSysVSharedMemory, "replace_alloc", semian_shm_object_replace_alloc, 1);

decrement.sem_num = kSHMIndexTicketLock;
decrement.sem_op = -1;
Expand All @@ -617,7 +624,7 @@ Init_semian_shm_object (void) {
increment.sem_op = 1;
increment.sem_flg = SEM_UNDO;

Init_semian_atomic_integer();
Init_semian_integer();
Init_semian_sliding_window();
}

3 changes: 2 additions & 1 deletion ext/semian/semian_shared_memory_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ semian_shm_object_type;
*/

VALUE semian_shm_object_sizeof(VALUE klass, VALUE type);
VALUE semian_shm_object_replace_alloc(VALUE klass, VALUE target);

VALUE semian_shm_object_acquire(VALUE self, VALUE id, VALUE byte_size, VALUE permissions);
VALUE semian_shm_object_destroy(VALUE self);
Expand All @@ -33,5 +34,5 @@ void semian_shm_object_delete_memory_inner (semian_shm_object *ptr, int should_u
VALUE semian_shm_object_delete_memory (VALUE self);
VALUE semian_shm_object_check_and_resize_if_needed (VALUE self);

void Init_semian_atomic_integer (void);
void Init_semian_integer (void);
void Init_semian_sliding_window (void);
3 changes: 3 additions & 0 deletions ext/semian/semian_sliding_window.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,11 @@ Init_semian_sliding_window (void)
{
VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian"));
VALUE cSysVModule = rb_const_get(cSemianModule, rb_intern("SysV"));
VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory"));
VALUE cSlidingWindow = rb_const_get(cSysVModule, rb_intern("SlidingWindow"));

semian_shm_object_replace_alloc(cSysVSharedMemory, cSlidingWindow);

rb_define_method(cSlidingWindow, "bind_init_fn", semian_sliding_window_bind_init_fn_wrapper, 0);
rb_define_method(cSlidingWindow, "size", semian_sliding_window_size, 0);
rb_define_method(cSlidingWindow, "max_size", semian_sliding_window_max_size, 0);
Expand Down
2 changes: 1 addition & 1 deletion lib/semian.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ def resources
require 'semian/protected_resource'
require 'semian/unprotected_resource'
require 'semian/platform'
require 'semian/shared_memory_object'
require 'semian/simple_sliding_window'
require 'semian/simple_integer'
require 'semian/simple_state'
require 'semian/sysv_shared_memory'
require 'semian/sysv_sliding_window'
require 'semian/sysv_integer'
require 'semian/sysv_state'
Expand Down
2 changes: 1 addition & 1 deletion lib/semian/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def error_timeout_expired?
end

def push_time(window, duration:, time: Time.now)
@errors.execute_atomically do # Store an integer amount of milliseconds since epoch
@errors.synchronize do # Store an integer amount of milliseconds since epoch
window.shift while window.first && window.first / 1000 + duration < time.to_i
window << (time.to_f * 1000).to_i
end
Expand Down
8 changes: 2 additions & 6 deletions lib/semian/simple_integer.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Semian
module Simple
class Integer < SharedMemoryObject #:nodoc:
class Integer #:nodoc:
attr_accessor :value

def initialize
Expand All @@ -16,11 +16,7 @@ def reset
end

def destroy
if shared?
super
else
@value = 0
end
reset
end
end
end
Expand Down
20 changes: 9 additions & 11 deletions lib/semian/simple_sliding_window.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
require 'forwardable'

module Semian
module Simple
class SlidingWindow < SharedMemoryObject #:nodoc:
class SlidingWindow #:nodoc:
extend Forwardable

def_delegators :@window, :size, :pop, :shift, :first, :last
attr_reader :max_size

# A sliding window is a structure that stores the most @max_size recent timestamps
# like this: if @max_size = 4, current time is 10, @window =[5,7,9,10].
# Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5.

def initialize(max_size:)
@max_size = max_size
@window = []
end

# A sliding window is a structure that stores the most @max_size recent timestamps
# like this: if @max_size = 4, current time is 10, @window =[5,7,9,10].
# Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5.

def resize_to(size)
raise ArgumentError.new('size must be larger than 0') if size < 1
@max_size = size
Expand All @@ -31,16 +33,12 @@ def push(time_ms)
alias_method :<<, :push

def clear
@window = []
@window.clear
self
end

def destroy
if shared?
super
else
clear
end
clear
end
end
end
Expand Down
4 changes: 1 addition & 3 deletions lib/semian/simple_state.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
require 'forwardable'

module Semian
module Simple
class State < SharedMemoryObject #:nodoc:
class State #:nodoc:
def initialize
reset
end
Expand Down
2 changes: 2 additions & 0 deletions lib/semian/sysv_integer.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Semian
module SysV
class Integer < Semian::Simple::Integer #:nodoc:
include SysVSharedMemory

def initialize(name:, permissions:)
data_layout = [:int]
super() unless acquire_memory_object(name, data_layout, permissions)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Semian
class SharedMemoryObject #:nodoc:
module SysVSharedMemory #:nodoc:
@type_size = {}
def self.sizeof(type)
size = (@type_size[type.to_sym] ||= (respond_to?(:_sizeof) ? _sizeof(type.to_sym) : 0))
Expand All @@ -15,30 +15,34 @@ def shmid
-1
end

def execute_atomically(&proc)
if respond_to?(:_execute_atomically) && @using_shared_memory
return _execute_atomically(&proc)
def synchronize(&proc)
if respond_to?(:_synchronize) && @using_shared_memory
return _synchronize(&proc)
else
call_with_mutex(&proc)
yield if block_given?
end
end

alias_method :transaction, :execute_atomically
alias_method :transaction, :synchronize

def destroy
_destroy if respond_to?(:_destroy) && @using_shared_memory
if respond_to?(:_destroy) && @using_shared_memory
_destroy
else
super
end
end

private

def shared?
@using_shared_memory ||= Semian.semaphores_enabled? && @using_shared_memory
@using_shared_memory
end

def acquire_memory_object(name, data_layout, permissions)
return @using_shared_memory = false unless Semian.semaphores_enabled? && respond_to?(:_acquire)

byte_size = data_layout.inject(0) { |sum, type| sum + ::Semian::SharedMemoryObject.sizeof(type) }
byte_size = data_layout.inject(0) { |sum, type| sum + ::Semian::SysVSharedMemory.sizeof(type) }
raise TypeError.new("Given data layout is 0 bytes: #{data_layout.inspect}") if byte_size <= 0
# Calls C layer to acquire/create a memory block, calling #bind_init_fn in the process, see below
_acquire(name, byte_size, permissions)
Expand Down
4 changes: 3 additions & 1 deletion lib/semian/sysv_sliding_window.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
module Semian
module SysV
class SlidingWindow < Semian::Simple::SlidingWindow #:nodoc:
include SysVSharedMemory

def initialize(max_size:, name:, permissions:)
data_layout = [:int, :int].concat(Array.new(max_size, :long))
super(max_size) unless acquire_memory_object(name, data_layout, permissions)
super(max_size: max_size) unless acquire_memory_object(name, data_layout, permissions)
end
end
end
Expand Down
Loading

0 comments on commit 4896c3c

Please sign in to comment.