diff --git a/ext/semian/extconf.rb b/ext/semian/extconf.rb index 597e1ff5..55f85424 100644 --- a/ext/semian/extconf.rb +++ b/ext/semian/extconf.rb @@ -23,7 +23,7 @@ have_func 'rb_thread_blocking_region' have_func 'rb_thread_call_without_gvl' -$CFLAGS = "-D_GNU_SOURCE -Werror -Wall " +$CFLAGS = "-D_GNU_SOURCE -Werror -Wall -std=c99 " if ENV.key?('DEBUG') $CFLAGS << "-O0 -g" else diff --git a/ext/semian/semian.c b/ext/semian/semian.c index 821872a4..ef888477 100644 --- a/ext/semian/semian.c +++ b/ext/semian/semian.c @@ -1,38 +1,6 @@ -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) -// 2.0 -#include -#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b)) -#elif defined(HAVE_RB_THREAD_BLOCKING_REGION) - // 1.9 -typedef VALUE (*my_blocking_fn_t)(void*); -#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) -#endif +#include "semian.h" static ID id_timeout; -static VALUE eSyscall, eTimeout, eInternal; static int system_max_semaphore_count; static const int kIndexTickets = 0; @@ -48,7 +16,7 @@ typedef struct { char *name; } semian_resource_t; -static key_t +key_t generate_key(const char *name) { union { @@ -67,7 +35,7 @@ ms_to_timespec(long ms, struct timespec *ts) ts->tv_nsec = (ms % 1000) * 1000000; } -static void +void raise_semian_syscall_error(const char *syscall, int error_num) { rb_raise(eSyscall, "%s failed, errno: %d (%s)", syscall, error_num, strerror(error_num)); @@ -115,7 +83,7 @@ semian_resource_alloc(VALUE klass) return obj; } -static void +void set_semaphore_permissions(int sem_id, int permissions) { union semun sem_opts; @@ -447,6 +415,9 @@ semian_resource_id(VALUE self) return LONG2FIX(res->sem_id); } + +void Init_semian_shm_object(); + void Init_semian() { VALUE cSemian, cResource; @@ -504,4 +475,6 @@ void Init_semian() /* Maximum number of tickets available on this system. */ rb_define_const(cSemian, "MAX_TICKETS", INT2FIX(system_max_semaphore_count)); + + Init_semian_shm_object(); } diff --git a/ext/semian/semian.h b/ext/semian/semian.h new file mode 100644 index 00000000..d6eca09e --- /dev/null +++ b/ext/semian/semian.h @@ -0,0 +1,48 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include +#include + +#include + +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) +// 2.0 +#include +#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b)) +#elif defined(HAVE_RB_THREAD_BLOCKING_REGION) + // 1.9 +typedef VALUE (*my_blocking_fn_t)(void*); +#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) +#endif + +VALUE eSyscall, eTimeout, eInternal; + +key_t +generate_key(const char *name); + +void +raise_semian_syscall_error(const char *syscall, int error_num); + +void +set_semaphore_permissions(int sem_id, int permissions); diff --git a/ext/semian/semian_integer.c b/ext/semian/semian_integer.c new file mode 100644 index 00000000..d9d0ce9b --- /dev/null +++ b/ext/semian/semian_integer.c @@ -0,0 +1,105 @@ +#include "semian_shared_memory_object.h" + +typedef struct { + int value; +} semian_int; + +static void semian_integer_initialize_memory (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); +static VALUE semian_integer_bind_initialize_memory_callback(VALUE self); +static VALUE semian_integer_get_value(VALUE self); +static VALUE semian_integer_set_value(VALUE self, VALUE num); +static VALUE semian_integer_increment(int argc, VALUE *argv, VALUE self); + +static void +semian_integer_initialize_memory (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size) +{ + semian_int *ptr = dest; + semian_int *old = prev_data; + if (prev_data){ + ptr->value = old->value; + } else { + ptr->value=0; + } +} + +static VALUE +semian_integer_bind_initialize_memory_callback(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + ptr->initialize_memory = &semian_integer_initialize_memory; + return self; +} + +static VALUE +semian_integer_get_value(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + int value = ((semian_int *)(ptr->shm_address))->value; + return INT2NUM(value); +} + +static VALUE +semian_integer_set_value(VALUE self, VALUE num) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + if (TYPE(num) != T_FIXNUM && TYPE(num) != T_FLOAT) + return Qnil; + ((semian_int *)(ptr->shm_address))->value = NUM2INT(num); + return num; +} + +static VALUE +semian_integer_reset(VALUE self) +{ + return semian_integer_set_value(self, INT2NUM(0)); +} + +static VALUE +semian_integer_increment(int argc, VALUE *argv, VALUE self) +{ + VALUE num; + rb_scan_args(argc, argv, "01", &num); + if (num == Qnil) + num = INT2NUM(1); + + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + if (TYPE(num) != T_FIXNUM && TYPE(num) != T_FLOAT) + return Qnil; + ((semian_int *)(ptr->shm_address))->value += NUM2INT(num); + return self; +} + +static VALUE +semian_integer_calculate_byte_size(VALUE klass) +{ + return SIZET2NUM(sizeof(int)); +} + +void +Init_semian_integer (void) +{ + // Bind methods to Integer + VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory")); + VALUE cSysVModule = rb_const_get(cSemianModule, rb_intern("SysV")); + VALUE cInteger = rb_const_get(cSysVModule, rb_intern("Integer")); + + semian_shm_object_replace_alloc(cSysVSharedMemory, cInteger); + + rb_define_private_method(cInteger, "bind_initialize_memory_callback", semian_integer_bind_initialize_memory_callback, 0); + define_method_with_synchronize(cInteger, "value", semian_integer_get_value, 0); + define_method_with_synchronize(cInteger, "value=", semian_integer_set_value, 1); + define_method_with_synchronize(cInteger, "reset", semian_integer_reset, 0); + define_method_with_synchronize(cInteger, "increment", semian_integer_increment, -1); + rb_define_method(cInteger, "calculate_byte_size", semian_integer_calculate_byte_size, 0); +} diff --git a/ext/semian/semian_shared_memory_object.c b/ext/semian/semian_shared_memory_object.c new file mode 100644 index 00000000..9adf6a25 --- /dev/null +++ b/ext/semian/semian_shared_memory_object.c @@ -0,0 +1,422 @@ +#include "semian_shared_memory_object.h" + +const int kSHMSemaphoreCount = 1; // semaphores to be acquired +const int kSHMTicketMax = 1; +const int kSHMInitializeWaitTimeout = 5; /* seconds */ +const int kSHMIndexTicketLock = 0; +const int kSHMInternalTimeout = 5; /* seconds */ +const int kSHMRestoreLockStateRetryCount = 5; // perform semtimedop 5 times max + +static struct sembuf decrement; // = { kSHMIndexTicketLock, -1, SEM_UNDO}; +static struct sembuf increment; // = { kSHMIndexTicketLock, 1, SEM_UNDO}; + +/* + * Functions that handle type and memory +*/ +static void semian_shm_object_mark(void *ptr); +static void semian_shm_object_free(void *ptr); +static size_t semian_shm_object_memsize(const void *ptr); + +const rb_data_type_t +semian_shm_object_type = { + "semian_shm_object", + { + semian_shm_object_mark, + semian_shm_object_free, + semian_shm_object_memsize + }, + NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY +}; + +static void +semian_shm_object_mark(void *ptr) +{ + /* noop */ +} +static void +semian_shm_object_free(void *ptr) +{ + semian_shm_object *data = (semian_shm_object *)ptr; + // Under normal circumstances, memory use should be in the order of bytes, and shouldn't + // increase if the same key/id is used, so there is no need to delete the shared memory + // (also raises a concurrency-related bug: "object allocation during garbage collection phase") + xfree(data); +} +static size_t +semian_shm_object_memsize(const void *ptr) +{ + return sizeof(semian_shm_object); +} +static VALUE +semian_shm_object_alloc(VALUE klass) +{ + VALUE obj; + semian_shm_object *ptr; + obj = TypedData_Make_Struct(klass, semian_shm_object, &semian_shm_object_type, ptr); + return obj; +} + +/* + * Implementations + */ + +VALUE +semian_shm_object_replace_alloc(VALUE klass, VALUE target) +{ + rb_define_alloc_func(target, semian_shm_object_alloc); + return target; +} + +VALUE +semian_shm_object_acquire(VALUE self, VALUE name, VALUE byte_size, VALUE permissions) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (TYPE(name) != T_SYMBOL && TYPE(name) != T_STRING) + rb_raise(rb_eTypeError, "id must be a symbol or string"); + if (TYPE(byte_size) != T_FIXNUM) + rb_raise(rb_eTypeError, "expected integer for byte_size"); + if (TYPE(permissions) != T_FIXNUM) + rb_raise(rb_eTypeError, "expected integer for permissions"); + + if (NUM2SIZET(byte_size) <= 0) + rb_raise(rb_eArgError, "total size must be larger than 0"); + + const char *id_str = NULL; + if (TYPE(name) == T_SYMBOL) { + id_str = rb_id2name(rb_to_id(name)); + } else if (TYPE(name) == T_STRING) { + id_str = RSTRING_PTR(name); + } + ptr->key = generate_key(id_str); + ptr->byte_size = NUM2SIZET(byte_size); // byte_size >=1 or error would have been raised earlier + ptr->semid = -1; // id's default to -1 + ptr->shmid = -1; + ptr->shm_address = 0; // address defaults to NULL + ptr->lock_count = 0; // Emulates recursive mutex, 0->1 locks, 1->0 unlocks, rest noops + ptr->permissions = FIX2LONG(permissions); + ptr->initialize_memory = NULL; + + // Concrete classes must implement this in a subclass in C to bind a callback function of type + // void (*initialize_memory)(size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); + // to location ptr->initialize_memory, where ptr is a semian_shm_object* + // It is called when memory needs to be initialized or resized, possibly using previous memory + rb_funcall(self, rb_intern("bind_initialize_memory_callback"), 0); + if (NULL == ptr->initialize_memory) + rb_raise(rb_eNotImpError, "callback was not bound to ptr->initialize_memory"); + semian_shm_object_acquire_semaphore(self); + semian_shm_object_synchronize(self); + + return Qtrue; +} + +VALUE +semian_shm_object_destroy(VALUE self) +{ + VALUE result = semian_shm_object_cleanup_memory(self); + if (!result) + return Qfalse; + result = semian_shm_object_delete_semaphore(self); + return result; +} + +/* + * Create or acquire previously made semaphore + */ + +static int +create_semaphore_and_initialize_and_set_permissions(int key, int permissions) +{ + int semid = 0; + int flags = 0; + + flags = IPC_EXCL | IPC_CREAT | permissions; + + semid = semget(key, kSHMSemaphoreCount, flags); + if (semid >= 0) { + if (-1 == semctl(semid, 0, SETVAL, kSHMTicketMax)) { + rb_warn("semctl: failed to set semaphore with semid %d, position 0 to %d", semid, 1); + raise_semian_syscall_error("semctl()", errno); + } + } else if (semid == -1 && errno == EEXIST) { + flags &= ~IPC_EXCL; + semid = semget(key, kSHMSemaphoreCount, flags); + } + + if (-1 != semid){ + set_semaphore_permissions(semid, permissions); // Borrowed from semian.c + } + + return semid; +} + + +VALUE +semian_shm_object_acquire_semaphore (VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (-1 == (ptr->semid = create_semaphore_and_initialize_and_set_permissions(ptr->key, ptr->permissions))) { + raise_semian_syscall_error("semget()", errno); + } + return self; +} + +VALUE +semian_shm_object_delete_semaphore(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (-1 == ptr->semid){ // do nothing if semaphore not acquired + return Qfalse; + } + if (-1 == semctl(ptr->semid, 0, IPC_RMID)) { + if (EIDRM == errno) { + rb_warn("semctl: failed to delete semaphore set with semid %d: already removed", ptr->semid); + raise_semian_syscall_error("semctl()", errno); + ptr->semid = -1; + } else { + rb_warn("semctl: failed to remove semaphore with semid %d, errno %d (%s)", ptr->semid, errno, strerror(errno)); + raise_semian_syscall_error("semctl()", errno); + } + } else { + ptr->semid = -1; + } + return self; +} + +/* + * lock & unlock functions, should be called like + * (VALUE) WITHOUT_GVL(semian_shm_object_unlock_without_gvl, (void *)ptr, RUBY_UBF_IO, NULL) + */ + +static void * +semian_shm_object_lock_without_gvl(void *v_ptr) +{ + semian_shm_object *ptr = v_ptr; + if (-1 == ptr->semid) { + rb_raise(eInternal, "semid not set, errno %d: (%s)", errno, strerror(errno)); + } + struct timespec ts = { 0 }; + ts.tv_sec = kSHMInternalTimeout; + if (0 != ptr->lock_count || -1 != semtimedop(ptr->semid, &decrement, 1, &ts)) { + ptr->lock_count += 1; + } else { + rb_raise(eInternal, "error acquiring semaphore lock to mutate circuit breaker structure, %d: (%s)", errno, strerror(errno)); + } + return (void *)Qtrue; +} + +static void * +semian_shm_object_unlock_without_gvl(void *v_ptr) +{ + semian_shm_object *ptr = v_ptr; + if (-1 == ptr->semid){ + rb_raise(eInternal, "semid not set, errno %d: (%s)", errno, strerror(errno)); + } + if (1 != ptr->lock_count || -1 != semop(ptr->semid, &increment, 1)) { // No need for semtimedop + ptr->lock_count -= 1; + } else { + rb_raise(eInternal, "error unlocking semaphore, %d (%s)", errno, strerror(errno)); + } + return (void *)Qtrue; +} + +/* + * Wrap the lock-unlock functionality in ensures + */ + +typedef struct { // Workaround rb_ensure only allows one argument for each callback function + int pre_block_lock_count_state; + semian_shm_object *ptr; +} lock_status; + +static VALUE +semian_shm_object_synchronize_with_block(VALUE self) +{ + semian_shm_object_synchronize_memory_and_size(self, Qfalse); + if (!rb_block_given_p()) + return Qnil; + return rb_yield(Qnil); +} + +static VALUE +semian_shm_object_synchronize_restore_lock_status(VALUE v_status) +{ + lock_status *status = (lock_status *) v_status; + int tries = 0; + while (++tries < kSHMRestoreLockStateRetryCount && status->ptr->lock_count > status->pre_block_lock_count_state) + return (VALUE) WITHOUT_GVL(semian_shm_object_unlock_without_gvl, (void *)(status->ptr), RUBY_UBF_IO, NULL); + if (tries >= kSHMRestoreLockStateRetryCount) + rb_raise(eSyscall, "Failed to restore lock status after %d tries", kSHMRestoreLockStateRetryCount); + tries = 0; + while (++tries < kSHMRestoreLockStateRetryCount && status->ptr->lock_count < status->pre_block_lock_count_state) + return (VALUE) WITHOUT_GVL(semian_shm_object_lock_without_gvl, (void *)(status->ptr), RUBY_UBF_IO, NULL); + if (tries >= kSHMRestoreLockStateRetryCount) + rb_raise(eSyscall, "Failed to restore lock status after %d tries", kSHMRestoreLockStateRetryCount); + return Qnil; +} + +VALUE +semian_shm_object_synchronize(VALUE self) { // receives a block + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + lock_status status = { ptr->lock_count, ptr }; + WITHOUT_GVL(semian_shm_object_lock_without_gvl, (void *)ptr, RUBY_UBF_IO, NULL); + return rb_ensure(semian_shm_object_synchronize_with_block, self, semian_shm_object_synchronize_restore_lock_status, (VALUE)&status); +} + +void +define_method_with_synchronize(VALUE klass, const char *name, VALUE (*func)(ANYARGS), int argc) +{ + rb_define_method(klass, name, func, argc); + rb_funcall(klass, rb_intern("do_with_sync"), 1, rb_str_new2(name)); +} + +/* + * Memory functions + */ + +VALUE +semian_shm_object_synchronize_memory_and_size(VALUE self, VALUE is_master_obj) { + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + struct shmid_ds shm_info = { }; + const int SHMMIN = 1; // minimum size of shared memory on linux + key_t key = ptr->key; + + int is_master = RTEST(is_master_obj); // Controls whether synchronization is master or slave (both fast-forward, only master resizes/initializes) + is_master |= (-1 == ptr->shmid) && (0 == ptr->shm_address); + + int shmid_out_of_sync = 0; + shmid_out_of_sync |= (-1 == ptr->shmid) && (0 == ptr->shm_address); // If not attached at all + if ((-1 != ptr->shmid) && (-1 != shmctl(ptr->shmid, IPC_STAT, &shm_info))) { + shmid_out_of_sync |= shm_info.shm_perm.mode & SHM_DEST && // If current attached memory is marked for deletion + ptr->shmid != shmget(key, SHMMIN, IPC_CREAT | ptr->permissions); // If shmid not in sync + } + + size_t requested_byte_size = ptr->byte_size; + int first_sync = (-1 == ptr->shmid) && (shmid_out_of_sync); + + if (shmid_out_of_sync) { // We need to fast-forward to the current state and memory attachment + semian_shm_object_cleanup_memory(self); + if ((-1 == (ptr->shmid = shmget(key, SHMMIN, ptr->permissions)))) { + if ((-1 == (ptr->shmid = shmget(key, ptr->byte_size, IPC_CREAT | IPC_EXCL | ptr->permissions)))) { + rb_raise(eSyscall, "shmget failed to create or attach current memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } // If we can neither create a new memory block nor get the current one with a key, something's wrong + } + if ((void *)-1 == (ptr->shm_address = shmat(ptr->shmid, NULL, 0))) { + ptr->shm_address = NULL; + rb_raise(eSyscall, "shmat failed to mount current memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + } + + if (-1 == shmctl(ptr->shmid, IPC_STAT, &shm_info)){ + rb_raise(eSyscall, "shmctl failed to inspect current memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + ptr->byte_size = shm_info.shm_segsz; + + int old_mem_attach_count = shm_info.shm_nattch; + + if (is_master) { + if (ptr->byte_size == requested_byte_size && first_sync && 1 == old_mem_attach_count) { + ptr->initialize_memory(ptr->byte_size, ptr->shm_address, NULL, 0); // We clear the memory if worker is first to attach + } else if (ptr->byte_size != requested_byte_size) { + void *old_shm_address = ptr->shm_address; + size_t old_byte_size = ptr->byte_size; + unsigned char old_memory_content[old_byte_size]; // It is unsafe to use malloc here to store a copy of the memory + memcpy(old_memory_content, old_shm_address, old_byte_size); + semian_shm_object_cleanup_memory(self); // This may fail + + if (-1 == (ptr->shmid = shmget(key, requested_byte_size, IPC_CREAT | IPC_EXCL | ptr->permissions))) { + rb_raise(eSyscall, "shmget failed to create new resized memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + if ((void *)-1 == (ptr->shm_address = shmat(ptr->shmid, NULL, 0))) { + rb_raise(eSyscall, "shmat failed to mount new resized memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + ptr->byte_size = requested_byte_size; + + ptr->initialize_memory(ptr->byte_size, ptr->shm_address, old_memory_content, old_byte_size); + } + } + return self; +} + +static VALUE +semian_shm_object_cleanup_memory_inner(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (0 != ptr->shm_address && -1 == shmdt(ptr->shm_address)) { + rb_raise(eSyscall,"shmdt: no attached memory at %p, errno %d (%s)", ptr->shm_address, errno, strerror(errno)); + } + ptr->shm_address = 0; + + if (-1 != ptr->shmid && -1 == shmctl(ptr->shmid, IPC_RMID, 0)) { + if (errno != EINVAL) + rb_raise(eSyscall,"shmctl: error flagging memory for removal with shmid %d, errno %d (%s)", ptr->shmid, errno, strerror(errno)); + } + ptr->shmid = -1; + return Qnil; +} + +VALUE +semian_shm_object_cleanup_memory(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + lock_status status = { ptr->lock_count, ptr }; + WITHOUT_GVL(semian_shm_object_lock_without_gvl, (void *)ptr, RUBY_UBF_IO, NULL); + return rb_ensure(semian_shm_object_cleanup_memory_inner, self, semian_shm_object_synchronize_restore_lock_status, (VALUE)&status); +} + +static VALUE +semian_shm_object_semid(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (-1 == ptr->semid) + return -1; + semian_shm_object_synchronize(self); + return INT2NUM(ptr->semid); +} +static VALUE +semian_shm_object_shmid(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + return INT2NUM(ptr->shmid); +} + +void +Init_semian_shm_object (void) { + + VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory")); + + // Concrete classes must call acquire_memory_object() before accessing shared memory + // If SysV is enabled, a C method overrides this stub and returns true if acquiring succeeds + rb_define_method(cSysVSharedMemory, "acquire_memory_object", semian_shm_object_acquire, 3); + + rb_define_method(cSysVSharedMemory, "destroy", semian_shm_object_destroy, 0); + rb_define_method(cSysVSharedMemory, "synchronize", semian_shm_object_synchronize, 0); + + rb_define_method(cSysVSharedMemory, "semid", semian_shm_object_semid, 0); + define_method_with_synchronize(cSysVSharedMemory, "shmid", semian_shm_object_shmid, 0); + + rb_define_singleton_method(cSysVSharedMemory, "replace_alloc", semian_shm_object_replace_alloc, 1); + + decrement.sem_num = kSHMIndexTicketLock; + decrement.sem_op = -1; + decrement.sem_flg = SEM_UNDO; + + increment.sem_num = kSHMIndexTicketLock; + increment.sem_op = 1; + increment.sem_flg = SEM_UNDO; + + Init_semian_integer(); +} diff --git a/ext/semian/semian_shared_memory_object.h b/ext/semian/semian_shared_memory_object.h new file mode 100644 index 00000000..b7dad02b --- /dev/null +++ b/ext/semian/semian_shared_memory_object.h @@ -0,0 +1,34 @@ +#include "semian.h" + +typedef struct { + //semaphore, shared memory data and pointer + key_t key; + size_t byte_size; + int lock_count; // lock only done from 0 -> 1, unlock only done from 1 -> 0, so we can 'lock' multiple times (such as in nesting functions) without actually locking + int permissions; + int semid; + int shmid; + void (*initialize_memory)(size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); + void *shm_address; +} semian_shm_object; + +extern const rb_data_type_t +semian_shm_object_type; + +/* + * Headers + */ + +VALUE semian_shm_object_replace_alloc(VALUE klass, VALUE target); + +VALUE semian_shm_object_acquire(VALUE self, VALUE id, VALUE byte_size, VALUE permissions); +VALUE semian_shm_object_destroy(VALUE self); +VALUE semian_shm_object_acquire_semaphore (VALUE self); +VALUE semian_shm_object_delete_semaphore(VALUE self); +VALUE semian_shm_object_cleanup_memory (VALUE self); +VALUE semian_shm_object_synchronize_memory_and_size (VALUE self, VALUE is_master); + +VALUE semian_shm_object_synchronize(VALUE self); +void define_method_with_synchronize(VALUE klass, const char *name, VALUE (*func)(ANYARGS), int argc); + +void Init_semian_integer (void); diff --git a/lib/semian.rb b/lib/semian.rb index 9d4076e0..bb86a3b4 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -160,6 +160,8 @@ def resources require 'semian/simple_integer' require 'semian/simple_state' if Semian.semaphores_enabled? + require 'semian/sysv_shared_memory' + require 'semian/sysv_integer' require 'semian/semian' else Semian::MAX_TICKETS = 0 diff --git a/lib/semian/simple_integer.rb b/lib/semian/simple_integer.rb index aff73220..cb4403e9 100644 --- a/lib/semian/simple_integer.rb +++ b/lib/semian/simple_integer.rb @@ -3,7 +3,7 @@ module Simple class Integer #:nodoc: attr_accessor :value - def initialize + def initialize(**) reset end diff --git a/lib/semian/simple_sliding_window.rb b/lib/semian/simple_sliding_window.rb index 2507bfb8..26aea5fb 100644 --- a/lib/semian/simple_sliding_window.rb +++ b/lib/semian/simple_sliding_window.rb @@ -10,7 +10,7 @@ class SlidingWindow #:nodoc: # like this: if @max_size = 4, current time is 10, @window =[5,7,9,10]. # Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5. - def initialize(max_size:) + def initialize(max_size:, **) @max_size = max_size @window = [] end diff --git a/lib/semian/simple_state.rb b/lib/semian/simple_state.rb index d97f4960..cefea9dd 100644 --- a/lib/semian/simple_state.rb +++ b/lib/semian/simple_state.rb @@ -1,7 +1,7 @@ module Semian module Simple class State #:nodoc: - def initialize + def initialize(**) reset end diff --git a/lib/semian/sysv_integer.rb b/lib/semian/sysv_integer.rb new file mode 100644 index 00000000..15a4b7eb --- /dev/null +++ b/lib/semian/sysv_integer.rb @@ -0,0 +1,11 @@ +module Semian + module SysV + class Integer < Semian::Simple::Integer #:nodoc: + include SysVSharedMemory + + def initialize(name:, permissions:) + acquire_memory_object(name, calculate_byte_size, permissions) + end + end + end +end diff --git a/lib/semian/sysv_shared_memory.rb b/lib/semian/sysv_shared_memory.rb new file mode 100644 index 00000000..dbbbbae1 --- /dev/null +++ b/lib/semian/sysv_shared_memory.rb @@ -0,0 +1,53 @@ +module Semian + module SysVSharedMemory #:nodoc: + module SysVSynchronizeHelper + # This is a helper method for wrapping a method in :synchronize + # Its usage is to be called from C: where rb_define_method() is originally + # used, define_method_with_synchronize() is used instead, which calls this + def do_with_sync(*names) + names.each do |name| + new_name = "#{name}_inner" + alias_method new_name, name + private new_name + define_method(name) do |*args, &block| + synchronize do + method(new_name).call(*args, &block) + end + end + end + end + end + + extend SysVSynchronizeHelper + + def self.included(base) + base.extend(SysVSynchronizeHelper) + end + + def semid + -1 + end + + def shmid + -1 + end + + def synchronize + yield if block_given? + end + + def destroy + super + end + + private + + def acquire_memory_object(*) + raise NotImplementedError + end + + def bind_initialize_memory_callback + raise NotImplementedError + end + end +end diff --git a/test/sysv_integer_test.rb b/test/sysv_integer_test.rb new file mode 100644 index 00000000..2e4f4fec --- /dev/null +++ b/test/sysv_integer_test.rb @@ -0,0 +1,58 @@ +require 'test_helper' + +class TestSysVInteger < MiniTest::Unit::TestCase + KLASS = ::Semian::SysV::Integer + + def setup + @integer = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + @integer.reset + end + + def teardown + @integer.destroy + end + + include TestSimpleInteger::IntegerTestCases + + def test_memory_is_shared + integer_2 = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + integer_2.value = 100 + assert_equal 100, @integer.value + @integer.value = 200 + assert_equal 200, integer_2.value + @integer.value = 0 + assert_equal 0, integer_2.value + end + + def test_memory_not_reset_when_at_least_one_worker_using_it + @integer.value = 109 + integer_2 = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + assert_equal @integer.value, integer_2.value + + reader, writer = IO.pipe + pid = fork do + reader.close + integer_3 = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + assert_equal 109, integer_3.value + integer_3.value = 110 + writer.puts "Done" + writer.close + sleep + end + + reader.gets + Process.kill(9, pid) + Process.waitall + assert_equal 110, integer_2.value + end + + def test_memory_reset_when_no_workers_using_it + fork do + integer = KLASS.new(name: 'TestSysVInteger_2', permissions: 0660) + integer.value = 120 + end + Process.waitall + @integer = KLASS.new(name: 'TestSysVInteger_2', permissions: 0660) + assert_equal 0, @integer.value + end +end