diff --git a/.gitignore b/.gitignore
index 7631db29..c1e5b137 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,5 @@
/html/
Gemfile.lock
vendor/
+*.swp
+*.swo
diff --git a/.travis.yml b/.travis.yml
index 9b0871f4..350c4b8f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,6 +3,7 @@ language: ruby
sudo: true
before_install:
+ - gem update --system
- gem install bundler
- scripts/install_toxiproxy.sh
diff --git a/ext/semian/resource.c b/ext/semian/resource.c
new file mode 100644
index 00000000..b4f87519
--- /dev/null
+++ b/ext/semian/resource.c
@@ -0,0 +1,262 @@
+#include "resource.h"
+
+// "Private" function forward declarations
+static VALUE
+cleanup_semian_resource_acquire(VALUE self);
+
+static int
+check_tickets_arg(VALUE tickets);
+
+static long
+check_permissions_arg(VALUE permissions);
+
+static const
+char *check_id_arg(VALUE id);
+
+static double
+check_default_timeout_arg(VALUE default_timeout);
+
+static void
+ms_to_timespec(long ms, struct timespec *ts);
+
+static const rb_data_type_t
+semian_resource_type;
+
+
+VALUE
+semian_resource_acquire(int argc, VALUE *argv, VALUE self)
+{
+ semian_resource_t *self_res = NULL;
+ semian_resource_t res = { 0 };
+
+ if (!rb_block_given_p()) {
+ rb_raise(rb_eArgError, "acquire requires a block");
+ }
+
+ TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, self_res);
+ res = *self_res;
+
+ /* allow the default timeout to be overridden by a "timeout" param */
+ if (argc == 1 && TYPE(argv[0]) == T_HASH) {
+ VALUE timeout = rb_hash_aref(argv[0], ID2SYM(id_timeout));
+ if (TYPE(timeout) != T_NIL) {
+ if (TYPE(timeout) != T_FLOAT && TYPE(timeout) != T_FIXNUM) {
+ rb_raise(rb_eArgError, "timeout parameter must be numeric");
+ }
+ ms_to_timespec(NUM2DBL(timeout) * 1000, &res.timeout);
+ }
+ } else if (argc > 0) {
+ rb_raise(rb_eArgError, "invalid arguments");
+ }
+
+ /* release the GVL to acquire the semaphore */
+ acquire_semaphore_without_gvl(&res);
+ if (res.error != 0) {
+ if (res.error == EAGAIN) {
+ rb_raise(eTimeout, "timed out waiting for resource '%s'", res.name);
+ } else {
+ raise_semian_syscall_error("semop()", res.error);
+ }
+ }
+
+ return rb_ensure(rb_yield, self, cleanup_semian_resource_acquire, self);
+}
+
+VALUE
+semian_resource_destroy(VALUE self)
+{
+ semian_resource_t *res = NULL;
+
+ TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
+ if (semctl(res->sem_id, SI_NUM_SEMAPHORES, IPC_RMID) == -1) {
+ raise_semian_syscall_error("semctl()", errno);
+ }
+
+ return Qtrue;
+}
+
+VALUE
+semian_resource_count(VALUE self)
+{
+ int ret;
+ semian_resource_t *res = NULL;
+
+ TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
+ ret = semctl(res->sem_id, SI_SEM_TICKETS, GETVAL);
+ if (ret == -1) {
+ raise_semian_syscall_error("semctl()", errno);
+ }
+
+ return LONG2FIX(ret);
+}
+
+VALUE
+semian_resource_id(VALUE self)
+{
+ semian_resource_t *res = NULL;
+ TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
+ return LONG2FIX(res->sem_id);
+}
+
+VALUE
+semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE permissions, VALUE default_timeout)
+{
+ key_t key;
+ int c_permissions;
+ double c_timeout;
+ int c_tickets;
+ int created = 0;
+ semian_resource_t *res = NULL;
+ const char *c_id_str = NULL;
+
+ // Check and cast arguments
+ c_tickets = check_tickets_arg(tickets);
+ c_permissions = check_permissions_arg(permissions);
+ c_id_str = check_id_arg(id);
+ c_timeout = check_default_timeout_arg(default_timeout);
+
+ // Build semian resource structure
+ TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
+
+ // Populate struct fields
+ ms_to_timespec(c_timeout * 1000, &res->timeout);
+ res->name = strdup(c_id_str);
+
+ // Get or create semaphore set
+ // note that tickets = 0 will be used to acquire a semaphore set after it's been created elswhere
+ key = generate_sem_set_key(c_id_str);
+ res->sem_id = c_tickets == 0 ? get_semaphore(key) : create_semaphore(key, c_permissions, &created);
+ if (res->sem_id == -1) {
+ raise_semian_syscall_error("semget()", errno);
+ }
+
+ set_semaphore_permissions(res->sem_id, c_permissions);
+
+ // Configure semaphore ticket counts
+ configure_tickets(res->sem_id, c_tickets, created);
+
+ return self;
+}
+
+VALUE
+semian_resource_alloc(VALUE klass)
+{
+ semian_resource_t *res;
+ VALUE obj = TypedData_Make_Struct(klass, semian_resource_t, &semian_resource_type, res);
+ return obj;
+}
+/*
+*********************************************************************************************************
+"Private"
+
+These functions are specific to semian resource interals and may not be called by other files
+*********************************************************************************************************
+*/
+
+static VALUE
+cleanup_semian_resource_acquire(VALUE self)
+{
+ semian_resource_t *res = NULL;
+ TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
+ if (perform_semop(res->sem_id, SI_SEM_TICKETS, 1, SEM_UNDO, NULL) == -1) {
+ res->error = errno;
+ }
+ return Qnil;
+}
+
+static long check_permissions_arg(VALUE permissions)
+{
+ Check_Type(permissions, T_FIXNUM);
+ return FIX2LONG(permissions);
+}
+
+static int check_tickets_arg(VALUE tickets)
+{
+ int c_tickets;
+
+ if (TYPE(tickets) != T_NIL) {
+ if (TYPE(tickets) == T_FLOAT) {
+ rb_warn("semian ticket value %f is a float, converting to fixnum", RFLOAT_VALUE(tickets));
+ tickets = INT2FIX((int) RFLOAT_VALUE(tickets));
+ }
+ Check_Type(tickets, T_FIXNUM);
+
+ if (FIX2LONG(tickets) < 0 || FIX2LONG(tickets) > system_max_semaphore_count) {
+ rb_raise(rb_eArgError, "ticket count must be a non-negative value and less than %d", system_max_semaphore_count);
+ }
+ c_tickets = FIX2LONG(tickets);
+ } else {
+ c_tickets = -1;
+ }
+
+ return c_tickets;
+}
+
+static const char* check_id_arg(VALUE id)
+{
+ const char *c_id_str = NULL;
+
+ if (TYPE(id) != T_SYMBOL && TYPE(id) != T_STRING) {
+ rb_raise(rb_eTypeError, "id must be a symbol or string");
+ }
+ if (TYPE(id) == T_SYMBOL) {
+ c_id_str = rb_id2name(rb_to_id(id));
+ } else if (TYPE(id) == T_STRING) {
+ c_id_str = RSTRING_PTR(id);
+ }
+
+ return c_id_str;
+}
+
+static double check_default_timeout_arg(VALUE default_timeout)
+{
+ if (TYPE(default_timeout) != T_FIXNUM && TYPE(default_timeout) != T_FLOAT) {
+ rb_raise(rb_eTypeError, "expected numeric type for default_timeout");
+ }
+
+ if (NUM2DBL(default_timeout) < 0) {
+ rb_raise(rb_eArgError, "default timeout must be non-negative value");
+ }
+ return NUM2DBL(default_timeout);
+}
+
+static void
+ms_to_timespec(long ms, struct timespec *ts)
+{
+ ts->tv_sec = ms / 1000;
+ ts->tv_nsec = (ms % 1000) * 1000000;
+}
+
+static inline void
+semian_resource_mark(void *ptr)
+{
+ /* noop */
+}
+
+static inline void
+semian_resource_free(void *ptr)
+{
+ semian_resource_t *res = (semian_resource_t *) ptr;
+ if (res->name) {
+ free(res->name);
+ res->name = NULL;
+ }
+ xfree(res);
+}
+
+static inline size_t
+semian_resource_memsize(const void *ptr)
+{
+ return sizeof(semian_resource_t);
+}
+
+static const rb_data_type_t
+semian_resource_type = {
+ "semian_resource",
+ {
+ semian_resource_mark,
+ semian_resource_free,
+ semian_resource_memsize
+ },
+ NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY
+};
diff --git a/ext/semian/resource.h b/ext/semian/resource.h
new file mode 100644
index 00000000..e7595e90
--- /dev/null
+++ b/ext/semian/resource.h
@@ -0,0 +1,75 @@
+/*
+For core semian resource functions exposed directly to ruby.
+
+Functions here are associated with rubyland operations.
+*/
+#ifndef SEMIAN_RESOURCE_H
+#define SEMIAN_RESOURCE_H
+
+#include "types.h"
+#include "sysv_semaphores.h"
+#include "tickets.h"
+
+// Ruby variables
+ID id_timeout;
+int system_max_semaphore_count;
+
+/*
+ * call-seq:
+ * Semian::Resource.new(id, tickets, permissions, default_timeout) -> resource
+ *
+ * Creates a new Resource. Do not create resources directly. Use Semian.register.
+ */
+VALUE
+semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE permissions, VALUE default_timeout);
+
+/*
+ * call-seq:
+ * resource.acquire(timeout: default_timeout) { ... } -> result of the block
+ *
+ * Acquires a resource. The call will block for timeout
seconds if a ticket
+ * is not available. If no ticket is available within the timeout period, Semian::TimeoutError
+ * will be raised.
+ *
+ * If no timeout argument is provided, the default timeout passed to Semian.register will be used.
+ *
+ */
+VALUE
+semian_resource_acquire(int argc, VALUE *argv, VALUE self);
+
+/*
+ * call-seq:
+ * resource.destroy() -> true
+ *
+ * Destroys a resource. This method will destroy the underlying SysV semaphore.
+ * If there is any code in other threads or processes blocking or using the resource
+ * they will likely raise.
+ *
+ * Use this method very carefully.
+ */
+VALUE
+semian_resource_destroy(VALUE self);
+
+/*
+ * call-seq:
+ * resource.count -> count
+ *
+ * Returns the current ticket count for a resource.
+ */
+VALUE
+semian_resource_count(VALUE self);
+
+/*
+ * call-seq:
+ * resource.semid -> id
+ *
+ * Returns the SysV semaphore id of a resource.
+ */
+VALUE
+semian_resource_id(VALUE self);
+
+// Allocate a semian_resource_type struct for ruby memory management
+VALUE
+semian_resource_alloc(VALUE klass);
+
+#endif //SEMIAN_RESOURCE_H
diff --git a/ext/semian/semian.c b/ext/semian/semian.c
index 2a346bf6..9272d61f 100644
--- a/ext/semian/semian.c
+++ b/ext/semian/semian.c
@@ -1,457 +1,4 @@
-#include
-#include
-#include
-#include
-#include
-#include
-
-#include
-#include
-#include
-
-#include
-
-#include
-
-union semun {
- int val; /* Value for SETVAL */
- struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
- unsigned short *array; /* Array for GETALL, SETALL */
- struct seminfo *__buf; /* Buffer for IPC_INFO
- (Linux-specific) */
-};
-
-#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
-// 2.0
-#include
-#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b))
-#elif defined(HAVE_RB_THREAD_BLOCKING_REGION)
- // 1.9
-typedef VALUE (*my_blocking_fn_t)(void*);
-#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
-#endif
-
-static ID id_timeout;
-static VALUE eSyscall, eTimeout, eInternal;
-static int system_max_semaphore_count;
-
-typedef enum
-{
- SI_SEM_TICKETS, // semaphore for the tickets currently issued
- SI_SEM_CONFIGURED_TICKETS, // semaphore to track the desired number of tickets available for issue
- SI_SEM_LOCK, // metadata lock to act as a mutex, ensuring thread-safety for updating other semaphores
- SI_NUM_SEMAPHORES // always leave this as last entry for count to be accurate
-} semaphore_indices;
-
-typedef struct {
- int sem_id;
- struct timespec timeout;
- int error;
- char *name;
-} semian_resource_t;
-
-static key_t
-generate_key(const char *name)
-{
- union {
- unsigned char str[SHA_DIGEST_LENGTH];
- key_t key;
- } digest;
- SHA1((const unsigned char *) name, strlen(name), digest.str);
- /* TODO: compile-time assertion that sizeof(key_t) > SHA_DIGEST_LENGTH */
- return digest.key;
-}
-
-static void
-ms_to_timespec(long ms, struct timespec *ts)
-{
- ts->tv_sec = ms / 1000;
- ts->tv_nsec = (ms % 1000) * 1000000;
-}
-
-static void
-raise_semian_syscall_error(const char *syscall, int error_num)
-{
- rb_raise(eSyscall, "%s failed, errno: %d (%s)", syscall, error_num, strerror(error_num));
-}
-
-static void
-semian_resource_mark(void *ptr)
-{
- /* noop */
-}
-
-static void
-semian_resource_free(void *ptr)
-{
- semian_resource_t *res = (semian_resource_t *) ptr;
- if (res->name) {
- free(res->name);
- res->name = NULL;
- }
- xfree(res);
-}
-
-static size_t
-semian_resource_memsize(const void *ptr)
-{
- return sizeof(semian_resource_t);
-}
-
-static const rb_data_type_t
-semian_resource_type = {
- "semian_resource",
- {
- semian_resource_mark,
- semian_resource_free,
- semian_resource_memsize
- },
- NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY
-};
-
-static VALUE
-semian_resource_alloc(VALUE klass)
-{
- semian_resource_t *res;
- VALUE obj = TypedData_Make_Struct(klass, semian_resource_t, &semian_resource_type, res);
- return obj;
-}
-
-static void
-set_semaphore_permissions(int sem_id, int permissions)
-{
- union semun sem_opts;
- struct semid_ds stat_buf;
-
- sem_opts.buf = &stat_buf;
- semctl(sem_id, 0, IPC_STAT, sem_opts);
- if ((stat_buf.sem_perm.mode & 0xfff) != permissions) {
- stat_buf.sem_perm.mode &= ~0xfff;
- stat_buf.sem_perm.mode |= permissions;
- semctl(sem_id, 0, IPC_SET, sem_opts);
- }
-}
-
-static const int kInternalTimeout = 5; /* seconds */
-
-static int
-get_max_tickets(int sem_id)
-{
- int ret = semctl(sem_id, SI_SEM_CONFIGURED_TICKETS, GETVAL);
- if (ret == -1) {
- rb_raise(eInternal, "error getting max ticket count, errno: %d (%s)", errno, strerror(errno));
- }
- return ret;
-}
-
-static int
-perform_semop(int sem_id, short index, short op, short flags, struct timespec *ts)
-{
- struct sembuf buf = { 0 };
-
- buf.sem_num = index;
- buf.sem_op = op;
- buf.sem_flg = flags;
-
- if (ts) {
- return semtimedop(sem_id, &buf, 1, ts);
- } else {
- return semop(sem_id, &buf, 1);
- }
-}
-
-typedef struct {
- int sem_id;
- int tickets;
-} update_ticket_count_t;
-
-static VALUE
-update_ticket_count(update_ticket_count_t *tc)
-{
- short delta;
- struct timespec ts = { 0 };
- ts.tv_sec = kInternalTimeout;
-
- if (get_max_tickets(tc->sem_id) != tc->tickets) {
- delta = tc->tickets - get_max_tickets(tc->sem_id);
-
- if (perform_semop(tc->sem_id, SI_SEM_TICKETS, delta, 0, &ts) == -1) {
- rb_raise(eInternal, "error setting ticket count, errno: %d (%s)", errno, strerror(errno));
- }
-
- if (semctl(tc->sem_id, SI_SEM_CONFIGURED_TICKETS, SETVAL, tc->tickets) == -1) {
- rb_raise(eInternal, "error updating max ticket count, errno: %d (%s)", errno, strerror(errno));
- }
- }
-
- return Qnil;
-}
-
-static void
-configure_tickets(int sem_id, int tickets, int should_initialize)
-{
- struct timespec ts = { 0 };
- unsigned short init_vals[SI_NUM_SEMAPHORES];
- struct timeval start_time, cur_time;
- update_ticket_count_t tc;
- int state;
-
- if (should_initialize) {
- init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = tickets;
- init_vals[SI_SEM_LOCK] = 1;
- if (semctl(sem_id, 0, SETALL, init_vals) == -1) {
- raise_semian_syscall_error("semctl()", errno);
- }
- } else if (tickets > 0) {
- /* it's possible that we haven't actually initialized the
- semaphore structure yet - wait a bit in that case */
- if (get_max_tickets(sem_id) == 0) {
- gettimeofday(&start_time, NULL);
- while (get_max_tickets(sem_id) == 0) {
- usleep(10000); /* 10ms */
- gettimeofday(&cur_time, NULL);
- if ((cur_time.tv_sec - start_time.tv_sec) > kInternalTimeout) {
- rb_raise(eInternal, "timeout waiting for semaphore initialization");
- }
- }
- }
-
- /*
- If the current max ticket count is not the same as the requested ticket
- count, we need to resize the count. We do this by adding the delta of
- (tickets - current_max_tickets) to the semaphore value.
- */
- if (get_max_tickets(sem_id) != tickets) {
- ts.tv_sec = kInternalTimeout;
-
- if (perform_semop(sem_id, SI_SEM_LOCK, -1, SEM_UNDO, &ts) == -1) {
- raise_semian_syscall_error("error acquiring internal semaphore lock, semtimedop()", errno);
- }
-
- tc.sem_id = sem_id;
- tc.tickets = tickets;
- rb_protect((VALUE (*)(VALUE)) update_ticket_count, (VALUE) &tc, &state);
-
- if (perform_semop(sem_id, SI_SEM_LOCK, 1, SEM_UNDO, NULL) == -1) {
- raise_semian_syscall_error("error releasing internal semaphore lock, semop()", errno);
- }
-
- if (state) {
- rb_jump_tag(state);
- }
- }
- }
-}
-
-static int
-create_semaphore(int key, int permissions, int *created)
-{
- int semid = 0;
- int flags = 0;
-
- *created = 0;
- flags = IPC_EXCL | IPC_CREAT | FIX2LONG(permissions);
-
- semid = semget(key, SI_NUM_SEMAPHORES, flags);
- if (semid >= 0) {
- *created = 1;
- } else if (semid == -1 && errno == EEXIST) {
- flags &= ~IPC_EXCL;
- semid = semget(key, SI_NUM_SEMAPHORES, flags);
- }
- return semid;
-}
-
-static int
-get_semaphore(int key)
-{
- return semget(key, SI_NUM_SEMAPHORES, 0);
-}
-
-/*
- * call-seq:
- * Semian::Resource.new(id, tickets, permissions, default_timeout) -> resource
- *
- * Creates a new Resource. Do not create resources directly. Use Semian.register.
- */
-static VALUE
-semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE permissions, VALUE default_timeout)
-{
- key_t key;
- int created = 0;
- semian_resource_t *res = NULL;
- const char *id_str = NULL;
-
- if (TYPE(id) != T_SYMBOL && TYPE(id) != T_STRING) {
- rb_raise(rb_eTypeError, "id must be a symbol or string");
- }
- if (TYPE(tickets) == T_FLOAT) {
- rb_warn("semian ticket value %f is a float, converting to fixnum", RFLOAT_VALUE(tickets));
- tickets = INT2FIX((int) RFLOAT_VALUE(tickets));
- }
- Check_Type(tickets, T_FIXNUM);
- Check_Type(permissions, T_FIXNUM);
- if (TYPE(default_timeout) != T_FIXNUM && TYPE(default_timeout) != T_FLOAT) {
- rb_raise(rb_eTypeError, "expected numeric type for default_timeout");
- }
- if (FIX2LONG(tickets) < 0 || FIX2LONG(tickets) > system_max_semaphore_count) {
- rb_raise(rb_eArgError, "ticket count must be a non-negative value and less than %d", system_max_semaphore_count);
- }
- if (NUM2DBL(default_timeout) < 0) {
- rb_raise(rb_eArgError, "default timeout must be non-negative value");
- }
-
- if (TYPE(id) == T_SYMBOL) {
- id_str = rb_id2name(rb_to_id(id));
- } else if (TYPE(id) == T_STRING) {
- id_str = RSTRING_PTR(id);
- }
- TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
- key = generate_key(id_str);
- ms_to_timespec(NUM2DBL(default_timeout) * 1000, &res->timeout);
- res->name = strdup(id_str);
-
- res->sem_id = FIX2LONG(tickets) == 0 ? get_semaphore(key) : create_semaphore(key, permissions, &created);
- if (res->sem_id == -1) {
- raise_semian_syscall_error("semget()", errno);
- }
-
- configure_tickets(res->sem_id, FIX2LONG(tickets), created);
-
- set_semaphore_permissions(res->sem_id, FIX2LONG(permissions));
-
- return self;
-}
-
-static VALUE
-cleanup_semian_resource_acquire(VALUE self)
-{
- semian_resource_t *res = NULL;
- TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
- if (perform_semop(res->sem_id, SI_SEM_TICKETS, 1, SEM_UNDO, NULL) == -1) {
- res->error = errno;
- }
- return Qnil;
-}
-
-static void *
-acquire_semaphore_without_gvl(void *p)
-{
- semian_resource_t *res = (semian_resource_t *) p;
- res->error = 0;
- if (perform_semop(res->sem_id, SI_SEM_TICKETS, -1, SEM_UNDO, &res->timeout) == -1) {
- res->error = errno;
- }
- return NULL;
-}
-
-/*
- * call-seq:
- * resource.acquire(timeout: default_timeout) { ... } -> result of the block
- *
- * Acquires a resource. The call will block for timeout
seconds if a ticket
- * is not available. If no ticket is available within the timeout period, Semian::TimeoutError
- * will be raised.
- *
- * If no timeout argument is provided, the default timeout passed to Semian.register will be used.
- *
- */
-static VALUE
-semian_resource_acquire(int argc, VALUE *argv, VALUE self)
-{
- semian_resource_t *self_res = NULL;
- semian_resource_t res = { 0 };
-
- if (!rb_block_given_p()) {
- rb_raise(rb_eArgError, "acquire requires a block");
- }
-
- TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, self_res);
- res = *self_res;
-
- /* allow the default timeout to be overridden by a "timeout" param */
- if (argc == 1 && TYPE(argv[0]) == T_HASH) {
- VALUE timeout = rb_hash_aref(argv[0], ID2SYM(id_timeout));
- if (TYPE(timeout) != T_NIL) {
- if (TYPE(timeout) != T_FLOAT && TYPE(timeout) != T_FIXNUM) {
- rb_raise(rb_eArgError, "timeout parameter must be numeric");
- }
- ms_to_timespec(NUM2DBL(timeout) * 1000, &res.timeout);
- }
- } else if (argc > 0) {
- rb_raise(rb_eArgError, "invalid arguments");
- }
-
- /* release the GVL to acquire the semaphore */
- WITHOUT_GVL(acquire_semaphore_without_gvl, &res, RUBY_UBF_IO, NULL);
- if (res.error != 0) {
- if (res.error == EAGAIN) {
- rb_raise(eTimeout, "timed out waiting for resource '%s'", res.name);
- } else {
- raise_semian_syscall_error("semop()", res.error);
- }
- }
-
- return rb_ensure(rb_yield, self, cleanup_semian_resource_acquire, self);
-}
-
-/*
- * call-seq:
- * resource.destroy() -> true
- *
- * Destroys a resource. This method will destroy the underlying SysV semaphore.
- * If there is any code in other threads or processes blocking or using the resource
- * they will likely raise.
- *
- * Use this method very carefully.
- */
-static VALUE
-semian_resource_destroy(VALUE self)
-{
- semian_resource_t *res = NULL;
-
- TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
- if (semctl(res->sem_id, 0, IPC_RMID) == -1) {
- raise_semian_syscall_error("semctl()", errno);
- }
-
- return Qtrue;
-}
-
-/*
- * call-seq:
- * resource.count -> count
- *
- * Returns the current ticket count for a resource.
- */
-static VALUE
-semian_resource_count(VALUE self)
-{
- int ret;
- semian_resource_t *res = NULL;
-
- TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
- ret = semctl(res->sem_id, 0, GETVAL);
- if (ret == -1) {
- raise_semian_syscall_error("semctl()", errno);
- }
-
- return LONG2FIX(ret);
-}
-
-/*
- * call-seq:
- * resource.semid -> id
- *
- * Returns the SysV semaphore id of a resource.
- */
-static VALUE
-semian_resource_id(VALUE self)
-{
- semian_resource_t *res = NULL;
- TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);
- return LONG2FIX(res->sem_id);
-}
+#include
void Init_semian()
{
diff --git a/ext/semian/semian.h b/ext/semian/semian.h
new file mode 100644
index 00000000..3a8f76ec
--- /dev/null
+++ b/ext/semian/semian.h
@@ -0,0 +1,26 @@
+/*
+System, 3rd party, and project includes
+
+Implements Init_semian, which is used as C/Ruby entrypoint.
+*/
+
+#ifndef SEMIAN_H
+#define SEMIAN_H
+
+// System includes
+#include
+#include
+#include
+
+// 3rd party includes
+#include
+#include
+#include
+#include
+
+//semian includes
+#include "resource.h"
+
+void Init_semian();
+
+#endif //SEMIAN_H
diff --git a/ext/semian/sysv_semaphores.c b/ext/semian/sysv_semaphores.c
new file mode 100644
index 00000000..6d4b10db
--- /dev/null
+++ b/ext/semian/sysv_semaphores.c
@@ -0,0 +1,69 @@
+#include
+
+const char *SEMINDEX_STRING[] = {
+ FOREACH_SEMINDEX(GENERATE_STRING)
+};
+
+void
+raise_semian_syscall_error(const char *syscall, int error_num)
+{
+ rb_raise(eSyscall, "%s failed, errno: %d (%s)", syscall, error_num, strerror(error_num));
+}
+
+key_t
+generate_sem_set_key(const char *name)
+{
+ char semset_size_key[20];
+ char *uniq_id_str;
+
+ // It is necessary for the cardinatily of the semaphore set to be part of the key
+ // or else sem_get will complain that we have requested an incorrect number of sems
+ // for the desired key, and have changed the number of semaphores for a given key
+ sprintf(semset_size_key, "_NUM_SEMS_%d", SI_NUM_SEMAPHORES);
+ uniq_id_str = malloc(strlen(name)+strlen(semset_size_key)+1);
+ strcpy(uniq_id_str, name);
+ strcat(uniq_id_str, semset_size_key);
+
+ union {
+ unsigned char str[SHA_DIGEST_LENGTH];
+ key_t key;
+ } digest;
+ SHA1((const unsigned char *) uniq_id_str, strlen(uniq_id_str), digest.str);
+ free(uniq_id_str);
+ /* TODO: compile-time assertion that sizeof(key_t) > SHA_DIGEST_LENGTH */
+ return digest.key;
+}
+
+void
+set_semaphore_permissions(int sem_id, long permissions)
+{
+ union semun sem_opts;
+ struct semid_ds stat_buf;
+
+ sem_opts.buf = &stat_buf;
+ semctl(sem_id, 0, IPC_STAT, sem_opts);
+ if ((stat_buf.sem_perm.mode & 0xfff) != permissions) {
+ stat_buf.sem_perm.mode &= ~0xfff;
+ stat_buf.sem_perm.mode |= permissions;
+ semctl(sem_id, 0, IPC_SET, sem_opts);
+ }
+}
+
+int
+create_semaphore(int key, long permissions, int *created)
+{
+ int semid = 0;
+ int flags = 0;
+
+ *created = 0;
+ flags = IPC_EXCL | IPC_CREAT | permissions;
+
+ semid = semget(key, SI_NUM_SEMAPHORES, flags);
+ if (semid >= 0) {
+ *created = 1;
+ } else if (semid == -1 && errno == EEXIST) {
+ flags &= ~IPC_EXCL;
+ semid = semget(key, SI_NUM_SEMAPHORES, flags);
+ }
+ return semid;
+}
diff --git a/ext/semian/sysv_semaphores.h b/ext/semian/sysv_semaphores.h
new file mode 100644
index 00000000..b6e8aba3
--- /dev/null
+++ b/ext/semian/sysv_semaphores.h
@@ -0,0 +1,145 @@
+/*
+For manipulating the semian's semaphore set
+
+Semian semaphore operations and initialization,
+and functions associated directly weth semops.
+*/
+#ifndef SEMIAN_SEMSET_H
+#define SEMIAN_SEMSET_H
+
+#include
+
+// Defines for ruby threading primitives
+#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
+// 2.0
+#include
+#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b))
+#elif defined(HAVE_RB_THREAD_BLOCKING_REGION)
+ // 1.9
+typedef VALUE (*my_blocking_fn_t)(void*);
+#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
+#endif
+
+// Time to wait for timed ops to complete
+#define INTERNAL_TIMEOUT 5 // seconds
+
+VALUE eSyscall, eTimeout, eInternal;
+
+// Here we define an enum value and string representation of each semaphore
+// This allows us to key the sem value and string rep in sync easily
+// utilizing pre-processor macros.
+// SI_SEM_TICKETS semaphore for the tickets currently issued
+// SI_SEM_CONFIGURED_TICKETS semaphore to track the desired number of tickets available for issue
+// SI_SEM_LOCK metadata lock to act as a mutex, ensuring thread-safety for updating other semaphores
+// SI_NUM_SEMAPHORES always leave this as last entry for count to be accurate
+#define FOREACH_SEMINDEX(SEMINDEX) \
+ SEMINDEX(SI_SEM_TICKETS) \
+ SEMINDEX(SI_SEM_CONFIGURED_TICKETS) \
+ SEMINDEX(SI_SEM_LOCK) \
+ SEMINDEX(SI_NUM_SEMAPHORES) \
+
+#define GENERATE_ENUM(ENUM) ENUM,
+#define GENERATE_STRING(STRING) #STRING,
+
+// Generate enum for sem indices
+enum SEMINDEX_ENUM {
+ FOREACH_SEMINDEX(GENERATE_ENUM)
+};
+
+// Generate string rep for sem indices for debugging puproses
+extern const char *SEMINDEX_STRING[];
+
+// Helper for syscall verbose debugging
+void
+raise_semian_syscall_error(const char *syscall, int error_num);
+
+// Genurates a unique key for the semaphore from the resource id
+key_t
+generate_sem_set_key(const char *name);
+
+// Set semaphore UNIX octal permissions
+void
+set_semaphore_permissions(int sem_id, long permissions);
+
+// Create a new sysV IPC semaphore set
+int
+create_semaphore(int key, long permissions, int *created);
+
+// Wrapper to performs a semop call
+// The call may be timed or untimed
+static inline int
+perform_semop(int sem_id, short index, short op, short flags, struct timespec *ts)
+{
+ struct sembuf buf = { 0 };
+
+ buf.sem_num = index;
+ buf.sem_op = op;
+ buf.sem_flg = flags;
+
+ if (ts) {
+ return semtimedop(sem_id, &buf, 1, ts);
+ } else {
+ return semop(sem_id, &buf, 1);
+ }
+}
+
+// Retrieve the current number of tickets in a semaphore by its semaphore index
+static inline int
+get_sem_val(int sem_id, int sem_index)
+{
+ int ret = semctl(sem_id, sem_index, GETVAL);
+ if (ret == -1) {
+ rb_raise(eInternal, "error getting value of %s, errno: %d (%s)", SEMINDEX_STRING[sem_index], errno, strerror(errno));
+ }
+ return ret;
+}
+
+// Obtain an exclusive lock on the semaphore set critical section
+static inline void
+sem_meta_lock(int sem_id)
+{
+ struct timespec ts = { 0 };
+ ts.tv_sec = INTERNAL_TIMEOUT;
+
+ if (perform_semop(sem_id, SI_SEM_LOCK, -1, SEM_UNDO, &ts) == -1) {
+ raise_semian_syscall_error("error acquiring internal semaphore lock, semtimedop()", errno);
+ }
+}
+
+// Release an exclusive lock on the semaphore set critical section
+static inline void
+sem_meta_unlock(int sem_id)
+{
+ if (perform_semop(sem_id, SI_SEM_LOCK, 1, SEM_UNDO, NULL) == -1) {
+ raise_semian_syscall_error("error releasing internal semaphore lock, semop()", errno);
+ }
+}
+
+// Retrieve a semaphore's ID from its key
+static inline int
+get_semaphore(int key)
+{
+ return semget(key, SI_NUM_SEMAPHORES, 0);
+}
+
+// WARNING: Never call directly
+// Decrements the ticket semaphore within the semaphore set
+static inline void *
+acquire_semaphore(void *p)
+{
+ semian_resource_t *res = (semian_resource_t *) p;
+ res->error = 0;
+ if (perform_semop(res->sem_id, SI_SEM_TICKETS, -1, SEM_UNDO, &res->timeout) == -1) {
+ res->error = errno;
+ }
+ return NULL;
+}
+
+// Acquire a ticket with the ruby Global VM lock released
+static inline void *
+acquire_semaphore_without_gvl(void *p)
+{
+ WITHOUT_GVL(acquire_semaphore, p, RUBY_UBF_IO, NULL);
+ return NULL;
+}
+#endif // SEMIAN_SEMSET_H
diff --git a/ext/semian/tickets.c b/ext/semian/tickets.c
new file mode 100644
index 00000000..f8a811af
--- /dev/null
+++ b/ext/semian/tickets.c
@@ -0,0 +1,104 @@
+#include
+
+// "Private" function forward declarations
+static void
+initialize_tickets(int sem_id, int tickets);
+
+static void
+configure_static_tickets(int sem_id, int tickets);
+
+VALUE
+update_ticket_count(update_ticket_count_t *tc)
+{
+ short delta;
+ struct timespec ts = { 0 };
+ ts.tv_sec = INTERNAL_TIMEOUT;
+
+ if (get_sem_val(tc->sem_id, SI_SEM_CONFIGURED_TICKETS) != tc->tickets) {
+ delta = tc->tickets - get_sem_val(tc->sem_id, SI_SEM_CONFIGURED_TICKETS);
+
+ if (perform_semop(tc->sem_id, SI_SEM_TICKETS, delta, 0, &ts) == -1) {
+ rb_raise(eInternal, "error setting ticket count, errno: %d (%s)", errno, strerror(errno));
+ }
+
+ if (semctl(tc->sem_id, SI_SEM_CONFIGURED_TICKETS, SETVAL, tc->tickets) == -1) {
+ rb_raise(eInternal, "error updating configured ticket count, errno: %d (%s)", errno, strerror(errno));
+ }
+ }
+
+ return Qnil;
+}
+void
+configure_tickets(int sem_id, int tickets, int should_initialize)
+{
+ if (should_initialize) {
+ initialize_tickets(sem_id, tickets);
+ }
+
+ if (tickets > 0) {
+ configure_static_tickets(sem_id, tickets);
+ }
+}
+
+/*
+*********************************************************************************************************
+"Private"
+
+These functions are specific to semian ticket interals and may not be called by other files
+*********************************************************************************************************
+*/
+
+static void
+initialize_tickets(int sem_id, int tickets)
+{
+ unsigned short init_vals[SI_NUM_SEMAPHORES];
+
+ if (tickets > 0) {
+ init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = tickets;
+ }
+ init_vals[SI_SEM_LOCK] = 1;
+ if (semctl(sem_id, 0, SETALL, init_vals) == -1) {
+ raise_semian_syscall_error("semctl()", errno);
+ }
+}
+
+static void
+configure_static_tickets(int sem_id, int tickets)
+{
+ int state;
+ struct timeval start_time, cur_time;
+ update_ticket_count_t tc;
+
+ /* it's possible that we haven't actually initialized the
+ semaphore structure yet - wait a bit in that case */
+ if (get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS) == 0) {
+ gettimeofday(&start_time, NULL);
+ while (get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS) == 0) {
+ usleep(10000); /* 10ms */
+ gettimeofday(&cur_time, NULL);
+ if ((cur_time.tv_sec - start_time.tv_sec) > INTERNAL_TIMEOUT) {
+ rb_raise(eInternal, "timeout waiting for semaphore initialization");
+ }
+ }
+ }
+
+ /*
+ If the current configured ticket count is not the same as the requested ticket
+ count, we need to resize the count. We do this by adding the delta of
+ (tickets - current_configured_tickets) to the semaphore value.
+ */
+ if (get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS) != tickets) {
+
+ sem_meta_lock(sem_id);
+
+ tc.sem_id = sem_id;
+ tc.tickets = tickets;
+ rb_protect((VALUE (*)(VALUE)) update_ticket_count, (VALUE) &tc, &state);
+
+ sem_meta_unlock(sem_id);
+
+ if (state) {
+ rb_jump_tag(state);
+ }
+ }
+}
diff --git a/ext/semian/tickets.h b/ext/semian/tickets.h
new file mode 100644
index 00000000..9ef022aa
--- /dev/null
+++ b/ext/semian/tickets.h
@@ -0,0 +1,17 @@
+/*
+For logic specific to manipulating semian ticket counts
+*/
+#ifndef SEMIAN_TICKETS_H
+#define SEMIAN_TICKETS_H
+
+#include
+
+// Update the ticket count for static ticket tracking
+VALUE
+update_ticket_count(update_ticket_count_t *tc);
+
+// Set initial ticket values upon resource creation
+void
+configure_tickets(int sem_id, int tickets, int should_initialize);
+
+#endif // SEMIAN_TICKETS_H
diff --git a/ext/semian/types.h b/ext/semian/types.h
new file mode 100644
index 00000000..c0ed4616
--- /dev/null
+++ b/ext/semian/types.h
@@ -0,0 +1,36 @@
+/*
+For custom type definitions specific to semian
+*/
+#ifndef SEMIAN_TYPES_H
+#define SEMIAN_TYPES_H
+
+#include
+#include
+#include
+#include
+
+// For sysV semop syscals
+// see man semop
+union semun {
+ int val; /* Value for SETVAL */
+ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
+ unsigned short *array; /* Array for GETALL, SETALL */
+ struct seminfo *__buf; /* Buffer for IPC_INFO
+ (Linux-specific) */
+};
+
+// To update the ticket count
+typedef struct {
+ int sem_id;
+ int tickets;
+} update_ticket_count_t;
+
+// Internal semaphore structure
+typedef struct {
+ int sem_id;
+ struct timespec timeout;
+ int error;
+ char *name;
+} semian_resource_t;
+
+#endif // SEMIAN_TYPES_H
diff --git a/semian.gemspec b/semian.gemspec
index d3f70efa..de9ed752 100644
--- a/semian.gemspec
+++ b/semian.gemspec
@@ -16,7 +16,7 @@ Gem::Specification.new do |s|
s.email = 'scott.francis@shopify.com'
s.license = 'MIT'
- s.files = `git ls-files`.split("\n")
+ s.files = Dir['{lib,ext}/**/**/*.{rb,h,c}']
s.extensions = ['ext/semian/extconf.rb']
s.add_development_dependency 'rake-compiler', '~> 0.9'
s.add_development_dependency 'rake', '< 11.0'