diff --git a/.gitignore b/.gitignore index 7631db29c..c1e5b1370 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ /html/ Gemfile.lock vendor/ +*.swp +*.swo diff --git a/.travis.yml b/.travis.yml index 9b0871f45..350c4b8f0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: ruby sudo: true before_install: + - gem update --system - gem install bundler - scripts/install_toxiproxy.sh diff --git a/ext/semian/semian.c b/ext/semian/semian.c index 2a346bf6f..9272d61f3 100644 --- a/ext/semian/semian.c +++ b/ext/semian/semian.c @@ -1,457 +1,4 @@ -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) -// 2.0 -#include -#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b)) -#elif defined(HAVE_RB_THREAD_BLOCKING_REGION) - // 1.9 -typedef VALUE (*my_blocking_fn_t)(void*); -#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) -#endif - -static ID id_timeout; -static VALUE eSyscall, eTimeout, eInternal; -static int system_max_semaphore_count; - -typedef enum -{ - SI_SEM_TICKETS, // semaphore for the tickets currently issued - SI_SEM_CONFIGURED_TICKETS, // semaphore to track the desired number of tickets available for issue - SI_SEM_LOCK, // metadata lock to act as a mutex, ensuring thread-safety for updating other semaphores - SI_NUM_SEMAPHORES // always leave this as last entry for count to be accurate -} semaphore_indices; - -typedef struct { - int sem_id; - struct timespec timeout; - int error; - char *name; -} semian_resource_t; - -static key_t -generate_key(const char *name) -{ - union { - unsigned char str[SHA_DIGEST_LENGTH]; - key_t key; - } digest; - SHA1((const unsigned char *) name, strlen(name), digest.str); - /* TODO: compile-time assertion that sizeof(key_t) > SHA_DIGEST_LENGTH */ - return digest.key; -} - -static void -ms_to_timespec(long ms, struct timespec *ts) -{ - ts->tv_sec = ms / 1000; - ts->tv_nsec = (ms % 1000) * 1000000; -} - -static void -raise_semian_syscall_error(const char *syscall, int error_num) -{ - rb_raise(eSyscall, "%s failed, errno: %d (%s)", syscall, error_num, strerror(error_num)); -} - -static void -semian_resource_mark(void *ptr) -{ - /* noop */ -} - -static void -semian_resource_free(void *ptr) -{ - semian_resource_t *res = (semian_resource_t *) ptr; - if (res->name) { - free(res->name); - res->name = NULL; - } - xfree(res); -} - -static size_t -semian_resource_memsize(const void *ptr) -{ - return sizeof(semian_resource_t); -} - -static const rb_data_type_t -semian_resource_type = { - "semian_resource", - { - semian_resource_mark, - semian_resource_free, - semian_resource_memsize - }, - NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY -}; - -static VALUE -semian_resource_alloc(VALUE klass) -{ - semian_resource_t *res; - VALUE obj = TypedData_Make_Struct(klass, semian_resource_t, &semian_resource_type, res); - return obj; -} - -static void -set_semaphore_permissions(int sem_id, int permissions) -{ - union semun sem_opts; - struct semid_ds stat_buf; - - sem_opts.buf = &stat_buf; - semctl(sem_id, 0, IPC_STAT, sem_opts); - if ((stat_buf.sem_perm.mode & 0xfff) != permissions) { - stat_buf.sem_perm.mode &= ~0xfff; - stat_buf.sem_perm.mode |= permissions; - semctl(sem_id, 0, IPC_SET, sem_opts); - } -} - -static const int kInternalTimeout = 5; /* seconds */ - -static int -get_max_tickets(int sem_id) -{ - int ret = semctl(sem_id, SI_SEM_CONFIGURED_TICKETS, GETVAL); - if (ret == -1) { - rb_raise(eInternal, "error getting max ticket count, errno: %d (%s)", errno, strerror(errno)); - } - return ret; -} - -static int -perform_semop(int sem_id, short index, short op, short flags, struct timespec *ts) -{ - struct sembuf buf = { 0 }; - - buf.sem_num = index; - buf.sem_op = op; - buf.sem_flg = flags; - - if (ts) { - return semtimedop(sem_id, &buf, 1, ts); - } else { - return semop(sem_id, &buf, 1); - } -} - -typedef struct { - int sem_id; - int tickets; -} update_ticket_count_t; - -static VALUE -update_ticket_count(update_ticket_count_t *tc) -{ - short delta; - struct timespec ts = { 0 }; - ts.tv_sec = kInternalTimeout; - - if (get_max_tickets(tc->sem_id) != tc->tickets) { - delta = tc->tickets - get_max_tickets(tc->sem_id); - - if (perform_semop(tc->sem_id, SI_SEM_TICKETS, delta, 0, &ts) == -1) { - rb_raise(eInternal, "error setting ticket count, errno: %d (%s)", errno, strerror(errno)); - } - - if (semctl(tc->sem_id, SI_SEM_CONFIGURED_TICKETS, SETVAL, tc->tickets) == -1) { - rb_raise(eInternal, "error updating max ticket count, errno: %d (%s)", errno, strerror(errno)); - } - } - - return Qnil; -} - -static void -configure_tickets(int sem_id, int tickets, int should_initialize) -{ - struct timespec ts = { 0 }; - unsigned short init_vals[SI_NUM_SEMAPHORES]; - struct timeval start_time, cur_time; - update_ticket_count_t tc; - int state; - - if (should_initialize) { - init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = tickets; - init_vals[SI_SEM_LOCK] = 1; - if (semctl(sem_id, 0, SETALL, init_vals) == -1) { - raise_semian_syscall_error("semctl()", errno); - } - } else if (tickets > 0) { - /* it's possible that we haven't actually initialized the - semaphore structure yet - wait a bit in that case */ - if (get_max_tickets(sem_id) == 0) { - gettimeofday(&start_time, NULL); - while (get_max_tickets(sem_id) == 0) { - usleep(10000); /* 10ms */ - gettimeofday(&cur_time, NULL); - if ((cur_time.tv_sec - start_time.tv_sec) > kInternalTimeout) { - rb_raise(eInternal, "timeout waiting for semaphore initialization"); - } - } - } - - /* - If the current max ticket count is not the same as the requested ticket - count, we need to resize the count. We do this by adding the delta of - (tickets - current_max_tickets) to the semaphore value. - */ - if (get_max_tickets(sem_id) != tickets) { - ts.tv_sec = kInternalTimeout; - - if (perform_semop(sem_id, SI_SEM_LOCK, -1, SEM_UNDO, &ts) == -1) { - raise_semian_syscall_error("error acquiring internal semaphore lock, semtimedop()", errno); - } - - tc.sem_id = sem_id; - tc.tickets = tickets; - rb_protect((VALUE (*)(VALUE)) update_ticket_count, (VALUE) &tc, &state); - - if (perform_semop(sem_id, SI_SEM_LOCK, 1, SEM_UNDO, NULL) == -1) { - raise_semian_syscall_error("error releasing internal semaphore lock, semop()", errno); - } - - if (state) { - rb_jump_tag(state); - } - } - } -} - -static int -create_semaphore(int key, int permissions, int *created) -{ - int semid = 0; - int flags = 0; - - *created = 0; - flags = IPC_EXCL | IPC_CREAT | FIX2LONG(permissions); - - semid = semget(key, SI_NUM_SEMAPHORES, flags); - if (semid >= 0) { - *created = 1; - } else if (semid == -1 && errno == EEXIST) { - flags &= ~IPC_EXCL; - semid = semget(key, SI_NUM_SEMAPHORES, flags); - } - return semid; -} - -static int -get_semaphore(int key) -{ - return semget(key, SI_NUM_SEMAPHORES, 0); -} - -/* - * call-seq: - * Semian::Resource.new(id, tickets, permissions, default_timeout) -> resource - * - * Creates a new Resource. Do not create resources directly. Use Semian.register. - */ -static VALUE -semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE permissions, VALUE default_timeout) -{ - key_t key; - int created = 0; - semian_resource_t *res = NULL; - const char *id_str = NULL; - - if (TYPE(id) != T_SYMBOL && TYPE(id) != T_STRING) { - rb_raise(rb_eTypeError, "id must be a symbol or string"); - } - if (TYPE(tickets) == T_FLOAT) { - rb_warn("semian ticket value %f is a float, converting to fixnum", RFLOAT_VALUE(tickets)); - tickets = INT2FIX((int) RFLOAT_VALUE(tickets)); - } - Check_Type(tickets, T_FIXNUM); - Check_Type(permissions, T_FIXNUM); - if (TYPE(default_timeout) != T_FIXNUM && TYPE(default_timeout) != T_FLOAT) { - rb_raise(rb_eTypeError, "expected numeric type for default_timeout"); - } - if (FIX2LONG(tickets) < 0 || FIX2LONG(tickets) > system_max_semaphore_count) { - rb_raise(rb_eArgError, "ticket count must be a non-negative value and less than %d", system_max_semaphore_count); - } - if (NUM2DBL(default_timeout) < 0) { - rb_raise(rb_eArgError, "default timeout must be non-negative value"); - } - - if (TYPE(id) == T_SYMBOL) { - id_str = rb_id2name(rb_to_id(id)); - } else if (TYPE(id) == T_STRING) { - id_str = RSTRING_PTR(id); - } - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); - key = generate_key(id_str); - ms_to_timespec(NUM2DBL(default_timeout) * 1000, &res->timeout); - res->name = strdup(id_str); - - res->sem_id = FIX2LONG(tickets) == 0 ? get_semaphore(key) : create_semaphore(key, permissions, &created); - if (res->sem_id == -1) { - raise_semian_syscall_error("semget()", errno); - } - - configure_tickets(res->sem_id, FIX2LONG(tickets), created); - - set_semaphore_permissions(res->sem_id, FIX2LONG(permissions)); - - return self; -} - -static VALUE -cleanup_semian_resource_acquire(VALUE self) -{ - semian_resource_t *res = NULL; - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); - if (perform_semop(res->sem_id, SI_SEM_TICKETS, 1, SEM_UNDO, NULL) == -1) { - res->error = errno; - } - return Qnil; -} - -static void * -acquire_semaphore_without_gvl(void *p) -{ - semian_resource_t *res = (semian_resource_t *) p; - res->error = 0; - if (perform_semop(res->sem_id, SI_SEM_TICKETS, -1, SEM_UNDO, &res->timeout) == -1) { - res->error = errno; - } - return NULL; -} - -/* - * call-seq: - * resource.acquire(timeout: default_timeout) { ... } -> result of the block - * - * Acquires a resource. The call will block for timeout seconds if a ticket - * is not available. If no ticket is available within the timeout period, Semian::TimeoutError - * will be raised. - * - * If no timeout argument is provided, the default timeout passed to Semian.register will be used. - * - */ -static VALUE -semian_resource_acquire(int argc, VALUE *argv, VALUE self) -{ - semian_resource_t *self_res = NULL; - semian_resource_t res = { 0 }; - - if (!rb_block_given_p()) { - rb_raise(rb_eArgError, "acquire requires a block"); - } - - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, self_res); - res = *self_res; - - /* allow the default timeout to be overridden by a "timeout" param */ - if (argc == 1 && TYPE(argv[0]) == T_HASH) { - VALUE timeout = rb_hash_aref(argv[0], ID2SYM(id_timeout)); - if (TYPE(timeout) != T_NIL) { - if (TYPE(timeout) != T_FLOAT && TYPE(timeout) != T_FIXNUM) { - rb_raise(rb_eArgError, "timeout parameter must be numeric"); - } - ms_to_timespec(NUM2DBL(timeout) * 1000, &res.timeout); - } - } else if (argc > 0) { - rb_raise(rb_eArgError, "invalid arguments"); - } - - /* release the GVL to acquire the semaphore */ - WITHOUT_GVL(acquire_semaphore_without_gvl, &res, RUBY_UBF_IO, NULL); - if (res.error != 0) { - if (res.error == EAGAIN) { - rb_raise(eTimeout, "timed out waiting for resource '%s'", res.name); - } else { - raise_semian_syscall_error("semop()", res.error); - } - } - - return rb_ensure(rb_yield, self, cleanup_semian_resource_acquire, self); -} - -/* - * call-seq: - * resource.destroy() -> true - * - * Destroys a resource. This method will destroy the underlying SysV semaphore. - * If there is any code in other threads or processes blocking or using the resource - * they will likely raise. - * - * Use this method very carefully. - */ -static VALUE -semian_resource_destroy(VALUE self) -{ - semian_resource_t *res = NULL; - - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); - if (semctl(res->sem_id, 0, IPC_RMID) == -1) { - raise_semian_syscall_error("semctl()", errno); - } - - return Qtrue; -} - -/* - * call-seq: - * resource.count -> count - * - * Returns the current ticket count for a resource. - */ -static VALUE -semian_resource_count(VALUE self) -{ - int ret; - semian_resource_t *res = NULL; - - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); - ret = semctl(res->sem_id, 0, GETVAL); - if (ret == -1) { - raise_semian_syscall_error("semctl()", errno); - } - - return LONG2FIX(ret); -} - -/* - * call-seq: - * resource.semid -> id - * - * Returns the SysV semaphore id of a resource. - */ -static VALUE -semian_resource_id(VALUE self) -{ - semian_resource_t *res = NULL; - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); - return LONG2FIX(res->sem_id); -} +#include void Init_semian() { diff --git a/ext/semian/semian.h b/ext/semian/semian.h new file mode 100644 index 000000000..dc365fb90 --- /dev/null +++ b/ext/semian/semian.h @@ -0,0 +1,34 @@ +/* +System, 3rd party, and project includes + +Implements Init_semian, which is used as C/Ruby entrypoint. +*/ + +#ifndef SEMIAN_H +#define SEMIAN_H + +// System includes +#include +#include +#include +#include +#include +#include +#include + +// 3rd party includes +#include +#include +#include +#include + +//semian includes +#include +#include +#include +#include +#include + +void Init_semian(); + +#endif //SEMIAN_H diff --git a/ext/semian/semian_resource.c b/ext/semian/semian_resource.c new file mode 100644 index 000000000..277667d65 --- /dev/null +++ b/ext/semian/semian_resource.c @@ -0,0 +1,218 @@ +#include + +// "Private" function forward declarations +static VALUE +cleanup_semian_resource_acquire(VALUE self); + +static int +check_tickets_arg(VALUE tickets); + +static long +check_permissions_arg(VALUE permissions); + +static const +char *check_id_arg(VALUE id); + +static double +check_default_timeout_arg(VALUE default_timeout); + +static void +ms_to_timespec(long ms, struct timespec *ts); + +VALUE +semian_resource_acquire(int argc, VALUE *argv, VALUE self) +{ + semian_resource_t *self_res = NULL; + semian_resource_t res = { 0 }; + + if (!rb_block_given_p()) { + rb_raise(rb_eArgError, "acquire requires a block"); + } + + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, self_res); + res = *self_res; + + /* allow the default timeout to be overridden by a "timeout" param */ + if (argc == 1 && TYPE(argv[0]) == T_HASH) { + VALUE timeout = rb_hash_aref(argv[0], ID2SYM(id_timeout)); + if (TYPE(timeout) != T_NIL) { + if (TYPE(timeout) != T_FLOAT && TYPE(timeout) != T_FIXNUM) { + rb_raise(rb_eArgError, "timeout parameter must be numeric"); + } + ms_to_timespec(NUM2DBL(timeout) * 1000, &res.timeout); + } + } else if (argc > 0) { + rb_raise(rb_eArgError, "invalid arguments"); + } + + /* release the GVL to acquire the semaphore */ + acquire_semaphore_without_gvl(&res); + if (res.error != 0) { + if (res.error == EAGAIN) { + rb_raise(eTimeout, "timed out waiting for resource '%s'", res.name); + } else { + raise_semian_syscall_error("semop()", res.error); + } + } + + return rb_ensure(rb_yield, self, cleanup_semian_resource_acquire, self); +} + +VALUE +semian_resource_destroy(VALUE self) +{ + semian_resource_t *res = NULL; + + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); + if (semctl(res->sem_id, SI_NUM_SEMAPHORES, IPC_RMID) == -1) { + raise_semian_syscall_error("semctl()", errno); + } + + return Qtrue; +} + +VALUE +semian_resource_count(VALUE self) +{ + int ret; + semian_resource_t *res = NULL; + + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); + ret = semctl(res->sem_id, SI_SEM_TICKETS, GETVAL); + if (ret == -1) { + raise_semian_syscall_error("semctl()", errno); + } + + return LONG2FIX(ret); +} + +VALUE +semian_resource_id(VALUE self) +{ + semian_resource_t *res = NULL; + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); + return LONG2FIX(res->sem_id); +} + +VALUE +semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE permissions, VALUE default_timeout) +{ + key_t key; + int c_permissions; + double c_timeout; + int c_tickets; + int created = 0; + semian_resource_t *res = NULL; + const char *c_id_str = NULL; + + // Check and cast arguments + c_tickets = check_tickets_arg(tickets); + c_permissions = check_permissions_arg(permissions); + c_id_str = check_id_arg(id); + c_timeout = check_default_timeout_arg(default_timeout); + + // Build semian resource structure + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); + + // Populate struct fields + ms_to_timespec(c_timeout * 1000, &res->timeout); + res->name = strdup(c_id_str); + + // Get or create semaphore set + // note that tickets = 0 will be used to acquire a semaphore set after it's been created elswhere + key = generate_sem_set_key(c_id_str); + res->sem_id = c_tickets == 0 ? get_semaphore(key) : create_semaphore(key, c_permissions, &created); + if (res->sem_id == -1) { + raise_semian_syscall_error("semget()", errno); + } + + set_semaphore_permissions(res->sem_id, c_permissions); + + // Configure semaphore ticket counts + configure_tickets(res->sem_id, c_tickets, created); + + return self; +} + +/* +********************************************************************************************************* +"Private" + +These functions are specific to semian resource interals and may not be called by other files +********************************************************************************************************* +*/ + +static VALUE +cleanup_semian_resource_acquire(VALUE self) +{ + semian_resource_t *res = NULL; + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); + if (perform_semop(res->sem_id, SI_SEM_TICKETS, 1, SEM_UNDO, NULL) == -1) { + res->error = errno; + } + return Qnil; +} + +static long check_permissions_arg(VALUE permissions) +{ + Check_Type(permissions, T_FIXNUM); + return FIX2LONG(permissions); +} + +static int check_tickets_arg(VALUE tickets) +{ + int c_tickets; + + if (TYPE(tickets) != T_NIL) { + if (TYPE(tickets) == T_FLOAT) { + rb_warn("semian ticket value %f is a float, converting to fixnum", RFLOAT_VALUE(tickets)); + tickets = INT2FIX((int) RFLOAT_VALUE(tickets)); + } + Check_Type(tickets, T_FIXNUM); + + if (FIX2LONG(tickets) < 0 || FIX2LONG(tickets) > system_max_semaphore_count) { + rb_raise(rb_eArgError, "ticket count must be a non-negative value and less than %d", system_max_semaphore_count); + } + c_tickets = FIX2LONG(tickets); + } else { + c_tickets = -1; + } + + return c_tickets; +} + +static const char* check_id_arg(VALUE id) +{ + const char *c_id_str = NULL; + + if (TYPE(id) != T_SYMBOL && TYPE(id) != T_STRING) { + rb_raise(rb_eTypeError, "id must be a symbol or string"); + } + if (TYPE(id) == T_SYMBOL) { + c_id_str = rb_id2name(rb_to_id(id)); + } else if (TYPE(id) == T_STRING) { + c_id_str = RSTRING_PTR(id); + } + + return c_id_str; +} + +static double check_default_timeout_arg(VALUE default_timeout) +{ + if (TYPE(default_timeout) != T_FIXNUM && TYPE(default_timeout) != T_FLOAT) { + rb_raise(rb_eTypeError, "expected numeric type for default_timeout"); + } + + if (NUM2DBL(default_timeout) < 0) { + rb_raise(rb_eArgError, "default timeout must be non-negative value"); + } + return NUM2DBL(default_timeout); +} + +static void +ms_to_timespec(long ms, struct timespec *ts) +{ + ts->tv_sec = ms / 1000; + ts->tv_nsec = (ms % 1000) * 1000000; +} + diff --git a/ext/semian/semian_resource.h b/ext/semian/semian_resource.h new file mode 100644 index 000000000..5e208b93a --- /dev/null +++ b/ext/semian/semian_resource.h @@ -0,0 +1,69 @@ +/* +For core semian resource functions exposed directly to ruby. + +Functions here are associated with rubyland operations. +*/ +#ifndef SEMIAN_RESOURCE_H +#define SEMIAN_RESOURCE_H + +#include + +// Ruby variables +ID id_timeout; +int system_max_semaphore_count; + +/* + * call-seq: + * Semian::Resource.new(id, tickets, permissions, default_timeout) -> resource + * + * Creates a new Resource. Do not create resources directly. Use Semian.register. + */ +VALUE +semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE permissions, VALUE default_timeout); + +/* + * call-seq: + * resource.acquire(timeout: default_timeout) { ... } -> result of the block + * + * Acquires a resource. The call will block for timeout seconds if a ticket + * is not available. If no ticket is available within the timeout period, Semian::TimeoutError + * will be raised. + * + * If no timeout argument is provided, the default timeout passed to Semian.register will be used. + * + */ +VALUE +semian_resource_acquire(int argc, VALUE *argv, VALUE self); + +/* + * call-seq: + * resource.destroy() -> true + * + * Destroys a resource. This method will destroy the underlying SysV semaphore. + * If there is any code in other threads or processes blocking or using the resource + * they will likely raise. + * + * Use this method very carefully. + */ +VALUE +semian_resource_destroy(VALUE self); + +/* + * call-seq: + * resource.count -> count + * + * Returns the current ticket count for a resource. + */ +VALUE +semian_resource_count(VALUE self); + +/* + * call-seq: + * resource.semid -> id + * + * Returns the SysV semaphore id of a resource. + */ +VALUE +semian_resource_id(VALUE self); + +#endif //SEMIAN_RESOURCE_H diff --git a/ext/semian/semian_resource_alloc.c b/ext/semian/semian_resource_alloc.c new file mode 100644 index 000000000..60383d43c --- /dev/null +++ b/ext/semian/semian_resource_alloc.c @@ -0,0 +1,12 @@ +#include + +const rb_data_type_t +semian_resource_type = { + "semian_resource", + { + semian_resource_mark, + semian_resource_free, + semian_resource_memsize + }, + NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY +}; diff --git a/ext/semian/semian_resource_alloc.h b/ext/semian/semian_resource_alloc.h new file mode 100644 index 000000000..f054c2968 --- /dev/null +++ b/ext/semian/semian_resource_alloc.h @@ -0,0 +1,47 @@ +/* +For memory management operations of semian resources. +*/ +#ifndef SEMIAN_RESOURCE_ALLOC_H +#define SEMIAN_RESOURCE_ALLOC_H + +#include + +// Semian resource rep for GC purposes +const rb_data_type_t +semian_resource_type; + +// Required, due to interface, but uneeded in implementation. +static inline void +semian_resource_mark(void *ptr) +{ + /* noop */ +} + +// Clean up a semian resource to prevent memory leakage +static inline void +semian_resource_free(void *ptr) +{ + semian_resource_t *res = (semian_resource_t *) ptr; + if (res->name) { + free(res->name); + res->name = NULL; + } + xfree(res); +} + +// Get memory size of the semian resource struct +static inline size_t +semian_resource_memsize(const void *ptr) +{ + return sizeof(semian_resource_t); +} + +// Allocate heap space for semian resource struct +static inline VALUE +semian_resource_alloc(VALUE klass) +{ + semian_resource_t *res; + VALUE obj = TypedData_Make_Struct(klass, semian_resource_t, &semian_resource_type, res); + return obj; +} +#endif //SEMIAN_RESOURCE_ALLOC_H diff --git a/ext/semian/semian_tickets.c b/ext/semian/semian_tickets.c new file mode 100644 index 000000000..9c26179fd --- /dev/null +++ b/ext/semian/semian_tickets.c @@ -0,0 +1,106 @@ +#include + +// "Private" function forward declarations +static void +initialize_tickets(int sem_id, int tickets); + +static void +configure_static_tickets(int sem_id, int tickets); + +VALUE +update_ticket_count(update_ticket_count_t *tc) +{ + short delta; + struct timespec ts = { 0 }; + ts.tv_sec = INTERNAL_TIMEOUT; + + if (get_sem_val(tc->sem_id, SI_SEM_CONFIGURED_TICKETS) != tc->tickets) { + delta = tc->tickets - get_sem_val(tc->sem_id, SI_SEM_CONFIGURED_TICKETS); + + if (perform_semop(tc->sem_id, SI_SEM_TICKETS, delta, 0, &ts) == -1) { + rb_raise(eInternal, "error setting ticket count, errno: %d (%s)", errno, strerror(errno)); + } + + if (semctl(tc->sem_id, SI_SEM_CONFIGURED_TICKETS, SETVAL, tc->tickets) == -1) { + rb_raise(eInternal, "error updating configured ticket count, errno: %d (%s)", errno, strerror(errno)); + } + } + + return Qnil; +} +void +configure_tickets(int sem_id, int tickets, int should_initialize) +{ + if (should_initialize) { + initialize_tickets(sem_id, tickets); + } + + if (tickets > 0) { + configure_static_tickets(sem_id, tickets); + } +} + +/* +********************************************************************************************************* +"Private" + +These functions are specific to semian ticket interals and may not be called by other files +********************************************************************************************************* +*/ + +static void +initialize_tickets(int sem_id, int tickets) +{ + unsigned short init_vals[SI_NUM_SEMAPHORES]; + + if (tickets > 0) { + init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = tickets; + } + init_vals[SI_SEM_LOCK] = 1; + if (semctl(sem_id, 0, SETALL, init_vals) == -1) { + raise_semian_syscall_error("semctl()", errno); + } +} + +static void +configure_static_tickets(int sem_id, int tickets) +{ + int state; + struct timeval start_time, cur_time; + update_ticket_count_t tc; + + /* it's possible that we haven't actually initialized the + semaphore structure yet - wait a bit in that case */ + if (get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS) == 0) { + gettimeofday(&start_time, NULL); + while (get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS) == 0) { + usleep(10000); /* 10ms */ + gettimeofday(&cur_time, NULL); + if ((cur_time.tv_sec - start_time.tv_sec) > INTERNAL_TIMEOUT) { + rb_raise(eInternal, "timeout waiting for semaphore initialization"); + } + } + } + + /* + If the current configured ticket count is not the same as the requested ticket + count, we need to resize the count. We do this by adding the delta of + (tickets - current_configured_tickets) to the semaphore value. + */ + if (get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS) != tickets) { + + sem_meta_lock(sem_id); + + tc.sem_id = sem_id; + tc.tickets = tickets; + rb_protect((VALUE (*)(VALUE)) update_ticket_count, (VALUE) &tc, &state); + + sem_meta_unlock(sem_id); + + if (state) { + rb_jump_tag(state); + } + } +} + + diff --git a/ext/semian/semian_tickets.h b/ext/semian/semian_tickets.h new file mode 100644 index 000000000..9ef022aa5 --- /dev/null +++ b/ext/semian/semian_tickets.h @@ -0,0 +1,17 @@ +/* +For logic specific to manipulating semian ticket counts +*/ +#ifndef SEMIAN_TICKETS_H +#define SEMIAN_TICKETS_H + +#include + +// Update the ticket count for static ticket tracking +VALUE +update_ticket_count(update_ticket_count_t *tc); + +// Set initial ticket values upon resource creation +void +configure_tickets(int sem_id, int tickets, int should_initialize); + +#endif // SEMIAN_TICKETS_H diff --git a/ext/semian/semian_types.h b/ext/semian/semian_types.h new file mode 100644 index 000000000..70857fd00 --- /dev/null +++ b/ext/semian/semian_types.h @@ -0,0 +1,32 @@ +/* +For custom type definitions specific to semian +*/ +#ifndef SEMIAN_TYPES_H +#define SEMIAN_TYPES_H + +// For sysV semop syscals +// see man semop +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +// FIXME +// Why is this needed? (i don't think it is) +typedef struct { + int sem_id; + int tickets; +} update_ticket_count_t; + +// Internal semaphore structure +typedef struct { + int sem_id; + struct timespec timeout; + int error; + char *name; +} semian_resource_t; + +#endif // SEMIAN_TYPES_H diff --git a/ext/semian/semset.c b/ext/semian/semset.c new file mode 100644 index 000000000..fe58efe6f --- /dev/null +++ b/ext/semian/semset.c @@ -0,0 +1,69 @@ +#include + +const char *SEMINDEX_STRING[] = { + FOREACH_SEMINDEX(GENERATE_STRING) +}; + +void +raise_semian_syscall_error(const char *syscall, int error_num) +{ + rb_raise(eSyscall, "%s failed, errno: %d (%s)", syscall, error_num, strerror(error_num)); +} + +key_t +generate_sem_set_key(const char *name) +{ + char semset_size_key[20]; + char *uniq_id_str; + + // It is necessary for the cardinatily of the semaphore set to be part of the key + // or else sem_get will complain that we have requested an incorrect number of sems + // for the desired key, and have changed the number of semaphores for a given key + sprintf(semset_size_key, "_NUM_SEMS_%d", SI_NUM_SEMAPHORES); + uniq_id_str = malloc(strlen(name)+strlen(semset_size_key)+1); + strcpy(uniq_id_str, name); + strcat(uniq_id_str, semset_size_key); + + union { + unsigned char str[SHA_DIGEST_LENGTH]; + key_t key; + } digest; + SHA1((const unsigned char *) uniq_id_str, strlen(uniq_id_str), digest.str); + free(uniq_id_str); + /* TODO: compile-time assertion that sizeof(key_t) > SHA_DIGEST_LENGTH */ + return digest.key; +} + +void +set_semaphore_permissions(int sem_id, long permissions) +{ + union semun sem_opts; + struct semid_ds stat_buf; + + sem_opts.buf = &stat_buf; + semctl(sem_id, 0, IPC_STAT, sem_opts); + if ((stat_buf.sem_perm.mode & 0xfff) != permissions) { + stat_buf.sem_perm.mode &= ~0xfff; + stat_buf.sem_perm.mode |= permissions; + semctl(sem_id, 0, IPC_SET, sem_opts); + } +} + +int +create_semaphore(int key, long permissions, int *created) +{ + int semid = 0; + int flags = 0; + + *created = 0; + flags = IPC_EXCL | IPC_CREAT | permissions; + + semid = semget(key, SI_NUM_SEMAPHORES, flags); + if (semid >= 0) { + *created = 1; + } else if (semid == -1 && errno == EEXIST) { + flags &= ~IPC_EXCL; + semid = semget(key, SI_NUM_SEMAPHORES, flags); + } + return semid; +} diff --git a/ext/semian/semset.h b/ext/semian/semset.h new file mode 100644 index 000000000..b6e8aba3c --- /dev/null +++ b/ext/semian/semset.h @@ -0,0 +1,145 @@ +/* +For manipulating the semian's semaphore set + +Semian semaphore operations and initialization, +and functions associated directly weth semops. +*/ +#ifndef SEMIAN_SEMSET_H +#define SEMIAN_SEMSET_H + +#include + +// Defines for ruby threading primitives +#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) +// 2.0 +#include +#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b)) +#elif defined(HAVE_RB_THREAD_BLOCKING_REGION) + // 1.9 +typedef VALUE (*my_blocking_fn_t)(void*); +#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) +#endif + +// Time to wait for timed ops to complete +#define INTERNAL_TIMEOUT 5 // seconds + +VALUE eSyscall, eTimeout, eInternal; + +// Here we define an enum value and string representation of each semaphore +// This allows us to key the sem value and string rep in sync easily +// utilizing pre-processor macros. +// SI_SEM_TICKETS semaphore for the tickets currently issued +// SI_SEM_CONFIGURED_TICKETS semaphore to track the desired number of tickets available for issue +// SI_SEM_LOCK metadata lock to act as a mutex, ensuring thread-safety for updating other semaphores +// SI_NUM_SEMAPHORES always leave this as last entry for count to be accurate +#define FOREACH_SEMINDEX(SEMINDEX) \ + SEMINDEX(SI_SEM_TICKETS) \ + SEMINDEX(SI_SEM_CONFIGURED_TICKETS) \ + SEMINDEX(SI_SEM_LOCK) \ + SEMINDEX(SI_NUM_SEMAPHORES) \ + +#define GENERATE_ENUM(ENUM) ENUM, +#define GENERATE_STRING(STRING) #STRING, + +// Generate enum for sem indices +enum SEMINDEX_ENUM { + FOREACH_SEMINDEX(GENERATE_ENUM) +}; + +// Generate string rep for sem indices for debugging puproses +extern const char *SEMINDEX_STRING[]; + +// Helper for syscall verbose debugging +void +raise_semian_syscall_error(const char *syscall, int error_num); + +// Genurates a unique key for the semaphore from the resource id +key_t +generate_sem_set_key(const char *name); + +// Set semaphore UNIX octal permissions +void +set_semaphore_permissions(int sem_id, long permissions); + +// Create a new sysV IPC semaphore set +int +create_semaphore(int key, long permissions, int *created); + +// Wrapper to performs a semop call +// The call may be timed or untimed +static inline int +perform_semop(int sem_id, short index, short op, short flags, struct timespec *ts) +{ + struct sembuf buf = { 0 }; + + buf.sem_num = index; + buf.sem_op = op; + buf.sem_flg = flags; + + if (ts) { + return semtimedop(sem_id, &buf, 1, ts); + } else { + return semop(sem_id, &buf, 1); + } +} + +// Retrieve the current number of tickets in a semaphore by its semaphore index +static inline int +get_sem_val(int sem_id, int sem_index) +{ + int ret = semctl(sem_id, sem_index, GETVAL); + if (ret == -1) { + rb_raise(eInternal, "error getting value of %s, errno: %d (%s)", SEMINDEX_STRING[sem_index], errno, strerror(errno)); + } + return ret; +} + +// Obtain an exclusive lock on the semaphore set critical section +static inline void +sem_meta_lock(int sem_id) +{ + struct timespec ts = { 0 }; + ts.tv_sec = INTERNAL_TIMEOUT; + + if (perform_semop(sem_id, SI_SEM_LOCK, -1, SEM_UNDO, &ts) == -1) { + raise_semian_syscall_error("error acquiring internal semaphore lock, semtimedop()", errno); + } +} + +// Release an exclusive lock on the semaphore set critical section +static inline void +sem_meta_unlock(int sem_id) +{ + if (perform_semop(sem_id, SI_SEM_LOCK, 1, SEM_UNDO, NULL) == -1) { + raise_semian_syscall_error("error releasing internal semaphore lock, semop()", errno); + } +} + +// Retrieve a semaphore's ID from its key +static inline int +get_semaphore(int key) +{ + return semget(key, SI_NUM_SEMAPHORES, 0); +} + +// WARNING: Never call directly +// Decrements the ticket semaphore within the semaphore set +static inline void * +acquire_semaphore(void *p) +{ + semian_resource_t *res = (semian_resource_t *) p; + res->error = 0; + if (perform_semop(res->sem_id, SI_SEM_TICKETS, -1, SEM_UNDO, &res->timeout) == -1) { + res->error = errno; + } + return NULL; +} + +// Acquire a ticket with the ruby Global VM lock released +static inline void * +acquire_semaphore_without_gvl(void *p) +{ + WITHOUT_GVL(acquire_semaphore, p, RUBY_UBF_IO, NULL); + return NULL; +} +#endif // SEMIAN_SEMSET_H diff --git a/semian.gemspec b/semian.gemspec index d3f70efad..6fbd61b1d 100644 --- a/semian.gemspec +++ b/semian.gemspec @@ -12,7 +12,7 @@ Gem::Specification.new do |s| across process boundaries with SysV semaphores. DOC s.homepage = 'https://github.com/shopify/semian' - s.authors = ['Scott Francis', 'Simon Eskildsen'] + s.authors = ['Scott Francis', 'Simon Eskildsen', 'Dale Hamel'] s.email = 'scott.francis@shopify.com' s.license = 'MIT'