diff --git a/.gitignore b/.gitignore index 95d00207..3834a68b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ /tmp /pkg *.gem +*.log +*.orig /html/ Gemfile.lock vendor/ @@ -18,3 +20,5 @@ nohup.out # IntelliJ/RubyMine/CLion project files .idea +CMakeLists.txt +cmake-build-debug diff --git a/README.md b/README.md index 5f79a855..4c478e01 100644 --- a/README.md +++ b/README.md @@ -174,7 +174,8 @@ You may now set quotas per worker: ```ruby client = Redis.new(semian: { name: "inventory", - quota: 0.5, + quota: 0.49, + min_tickets: 2, success_threshold: 2, error_threshold: 4, error_timeout: 20 @@ -182,17 +183,18 @@ client = Redis.new(semian: { ``` -Per the above example, you no longer need to care about the number of tickets. +Per the above example, you no longer need to care about the number of tickets. Rather, the tickets shall be computed as a proportion of the number of active workers. -Rather, the tickets shall be computed as a proportion of the number of active workers. +In this case, we'd allow 49% of the workers on a particular host to connect to this redis resource. -In this case, we'd allow 50% of the workers on a particular host to connect to this redis resource. +In particular, 1 worker = 1 ticket (due to `ceil`), 2 workers = 2 tickets (due to `min_tickets`), 4 workers = 2 tickets (due to `ceil`), 100 workers = 49 tickets. **Note**: - You must pass **exactly** one of ticket or quota. - Tickets available will be the ceiling of the quota ratio to the number of workers - - So, with one worker, there will always be a minimum of 1 ticket + - So, with one worker, there will always be a minimum of 1 ticket + - If you want to guarantee 2 tickets when there are 2 workers, use `min_tickets: 2` - Workers in different processes will automatically unregister when the process exits. #### Net::HTTP @@ -484,7 +486,9 @@ still experimenting with ways to figure out optimal ticket numbers. Generally something below half the number of workers on the server for endpoints that are queried frequently has worked well for us. -* **tickets**. Number of workers that can concurrently access a resource. +* **tickets**. Number of workers that can concurrently access a resource. (Mutually exclusive with **quota**.) +* **quota**. Percentage of workers that can concurrently access a resource. (Mutually exclusive with **tickets**.) +* **min_tickets**. Minimum number of tickets to allow when using **quota**. * **timeout**. Time to wait in seconds to acquire a ticket if there are no tickets left. We recommend this to be `0` unless you have very few workers running (i.e. less than ~5). diff --git a/ext/semian/extconf.rb b/ext/semian/extconf.rb index ac2c511b..88ab4e3f 100644 --- a/ext/semian/extconf.rb +++ b/ext/semian/extconf.rb @@ -23,8 +23,8 @@ have_func 'rb_thread_blocking_region' have_func 'rb_thread_call_without_gvl' -$CFLAGS = "-D_GNU_SOURCE -Werror -Wall " -if ENV.key?('DEBUG') +$CFLAGS = "-D_GNU_SOURCE -Werror -Wall -std=gnu99 " +if ENV.key?('DEBUG') || ENV.key?('SEMIAN_DEBUG') $CFLAGS << "-O0 -g -DDEBUG" else $CFLAGS << "-O3" diff --git a/ext/semian/resource.c b/ext/semian/resource.c index f4af4b5c..51e61ceb 100644 --- a/ext/semian/resource.c +++ b/ext/semian/resource.c @@ -4,7 +4,7 @@ static VALUE cleanup_semian_resource_acquire(VALUE self); static void -check_tickets_xor_quota_arg(VALUE tickets, VALUE quota); +check_tickets_xor_quota_arg(VALUE tickets, VALUE min_tickets, VALUE quota); static double check_quota_arg(VALUE quota); @@ -12,6 +12,9 @@ check_quota_arg(VALUE quota); static int check_tickets_arg(VALUE tickets); +static int +check_min_tickets_arg(VALUE min_tickets); + static long check_permissions_arg(VALUE permissions); @@ -200,24 +203,19 @@ semian_resource_key(VALUE self) } VALUE -semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout) +semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout, VALUE min_tickets) { - long c_permissions; - double c_timeout; - double c_quota; - int c_tickets; - semian_resource_t *res = NULL; - const char *c_id_str = NULL; - // Check and cast arguments - check_tickets_xor_quota_arg(tickets, quota); - c_quota = check_quota_arg(quota); - c_tickets = check_tickets_arg(tickets); - c_permissions = check_permissions_arg(permissions); - c_id_str = check_id_arg(id); - c_timeout = check_default_timeout_arg(default_timeout); + check_tickets_xor_quota_arg(tickets, min_tickets, quota); + double c_quota = check_quota_arg(quota); + int c_tickets = check_tickets_arg(tickets); + long c_permissions = check_permissions_arg(permissions); + const char *c_id_str = check_id_arg(id); + double c_timeout = check_default_timeout_arg(default_timeout); + int c_min_tickets = check_min_tickets_arg(min_tickets); // Build semian resource structure + semian_resource_t *res = NULL; TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); // Populate struct fields @@ -227,7 +225,7 @@ semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VAL res->wait_time = -1; // Initialize the semaphore set - initialize_semaphore_set(res, c_id_str, c_permissions, c_tickets, c_quota); + initialize_semaphore_set(res, c_id_str, c_permissions, c_tickets, c_min_tickets, c_quota); return self; } @@ -259,10 +257,22 @@ check_permissions_arg(VALUE permissions) } static void -check_tickets_xor_quota_arg(VALUE tickets, VALUE quota) +check_tickets_xor_quota_arg(VALUE tickets, VALUE min_tickets, VALUE quota) { - if ((TYPE(tickets) == T_NIL && TYPE(quota) == T_NIL) ||(TYPE(tickets) != T_NIL && TYPE(quota) != T_NIL)){ - rb_raise(rb_eArgError, "Must pass exactly one of ticket or quota"); + const char *msg = "Must pass exactly one of ticket or quota/min_tickets"; + if (TYPE(quota) != T_NIL) { + if (TYPE(tickets) != T_NIL) { + dprintf("FOO"); + rb_raise(rb_eArgError, msg); + } + } else if (TYPE(tickets) != T_NIL) { + if (TYPE(quota) != T_NIL || TYPE(min_tickets) != T_NIL) { + dprintf("FOO"); + rb_raise(rb_eArgError, msg); + } + } else { + dprintf("FOO"); + rb_raise(rb_eArgError, msg); } } @@ -308,6 +318,32 @@ check_tickets_arg(VALUE tickets) return c_tickets; } +static int +check_min_tickets_arg(VALUE min_tickets) +{ + int retval = -1; + + switch (rb_type(min_tickets)) { + case T_NIL: + case T_UNDEF: + return -1; + case T_FLOAT: + rb_warn("semian min_tickets value %f is a float, converting to fixnum", RFLOAT_VALUE(min_tickets)); + retval = (int) RFLOAT_VALUE(min_tickets); + break; + case T_FIXNUM: + retval = FIX2LONG(min_tickets); break; + default: + retval = -1; break; + } + + if (retval <= 0 || retval > system_max_semaphore_count) { + rb_raise(rb_eArgError, "max_tickets must be in range [1,%d)", system_max_semaphore_count); + } + + return retval; +} + static const char* check_id_arg(VALUE id) { diff --git a/ext/semian/resource.h b/ext/semian/resource.h index f726bf23..1a9a22db 100644 --- a/ext/semian/resource.h +++ b/ext/semian/resource.h @@ -21,7 +21,7 @@ int system_max_semaphore_count; * Creates a new Resource. Do not create resources directly. Use Semian.register. */ VALUE -semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout); +semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout, VALUE min_tickets); /* * call-seq: diff --git a/ext/semian/semian.c b/ext/semian/semian.c index 0cae6fdd..38b8f478 100644 --- a/ext/semian/semian.c +++ b/ext/semian/semian.c @@ -42,7 +42,7 @@ void Init_semian() eInternal = rb_const_get(cSemian, rb_intern("InternalError")); rb_define_alloc_func(cResource, semian_resource_alloc); - rb_define_method(cResource, "initialize_semaphore", semian_resource_initialize, 5); + rb_define_method(cResource, "initialize_semaphore", semian_resource_initialize, 6); rb_define_method(cResource, "acquire", semian_resource_acquire, -1); rb_define_method(cResource, "count", semian_resource_count, 0); rb_define_method(cResource, "semid", semian_resource_id, 0); diff --git a/ext/semian/sysv_semaphores.c b/ext/semian/sysv_semaphores.c index bed7e4f3..a6ecccdb 100644 --- a/ext/semian/sysv_semaphores.c +++ b/ext/semian/sysv_semaphores.c @@ -28,7 +28,7 @@ raise_semian_syscall_error(const char *syscall, int error_num) } void -initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota) +initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, int min_tickets, double quota) { res->key = generate_key(id_str); @@ -69,6 +69,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis configure_tickets_args_t configure_tickets_args = (configure_tickets_args_t){ .sem_id = res->sem_id, .tickets = tickets, + .min_tickets = min_tickets, .quota = quota, }; rb_protect( diff --git a/ext/semian/sysv_semaphores.h b/ext/semian/sysv_semaphores.h index c3f2d106..c7d07440 100644 --- a/ext/semian/sysv_semaphores.h +++ b/ext/semian/sysv_semaphores.h @@ -18,6 +18,7 @@ and functions associated directly weth semops. #include "types.h" #include "tickets.h" +#include "util.h" // Defines for ruby threading primitives #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) @@ -71,7 +72,7 @@ raise_semian_syscall_error(const char *syscall, int error_num); // Initialize the sysv semaphore structure void -initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota); +initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, int min_tickets, double quota); // Set semaphore UNIX octal permissions void @@ -106,17 +107,15 @@ get_semaphore(int key); void * acquire_semaphore_without_gvl(void *p); -#ifdef DEBUG static inline void print_sem_vals(int sem_id) { - printf("lock %d, tickets: %d configured: %d, registered workers %d\n", + dprintf("lock %d, tickets: %d configured: %d, registered workers %d", get_sem_val(sem_id, SI_SEM_LOCK), get_sem_val(sem_id, SI_SEM_TICKETS), get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS), get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS) ); } -#endif #endif // SEMIAN_SEMSET_H diff --git a/ext/semian/tickets.c b/ext/semian/tickets.c index 369f3058..257bad0a 100644 --- a/ext/semian/tickets.c +++ b/ext/semian/tickets.c @@ -5,7 +5,7 @@ static VALUE update_ticket_count(int sem_id, int count); static int -calculate_quota_tickets(int sem_id, double quota); +calculate_quota_tickets(int sem_id, double quota, int min_tickets); // Must be called with the semaphore meta lock already acquired VALUE @@ -14,7 +14,7 @@ configure_tickets(VALUE value) configure_tickets_args_t *args = (configure_tickets_args_t *)value; if (args->quota > 0) { - args->tickets = calculate_quota_tickets(args->sem_id, args->quota); + args->tickets = calculate_quota_tickets(args->sem_id, args->quota, args->min_tickets); } /* @@ -68,9 +68,22 @@ update_ticket_count(int sem_id, int tickets) } static int -calculate_quota_tickets (int sem_id, double quota) +min(const int a, const int b) { - int tickets = 0; - tickets = (int) ceil(get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS) * quota); - return tickets; + return a < b ? a : b; +} + +static int +max(const int a, const int b) +{ + return a > b ? a : b; +} + +static int +calculate_quota_tickets(int sem_id, double quota, int min_tickets) +{ + int workers = get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS); + int tickets = (int) ceil(workers * quota); + dprintf("Calculating quota tickets - sem_id:%d quota:%0.2f%% workers:%d min_tickets:%d tickets:%d", sem_id, quota * 100.0, workers, min_tickets, tickets); + return min_tickets > 0 ? min(workers, max(tickets, min_tickets)) : tickets; } diff --git a/ext/semian/types.h b/ext/semian/types.h index de0254ec..aec518d7 100644 --- a/ext/semian/types.h +++ b/ext/semian/types.h @@ -23,6 +23,7 @@ union semun { typedef struct { int sem_id; int tickets; + int min_tickets; double quota; } configure_tickets_args_t; diff --git a/ext/semian/util.h b/ext/semian/util.h new file mode 100644 index 00000000..820098bb --- /dev/null +++ b/ext/semian/util.h @@ -0,0 +1,25 @@ +#ifndef EXT_SEMIAN_UTIL_H +#define EXT_SEMIAN_UTIL_H + +#include +#include +#include + +#ifdef DEBUG +# define DEBUG_TEST 1 +#else +# define DEBUG_TEST 0 +#endif + +#define dprintf(fmt, ...) \ + do { \ + if (DEBUG_TEST) { \ + const pid_t pid = getpid(); \ + struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); \ + struct tm t; localtime_r(&(ts.tv_sec), &t); \ + char buf[128]; strftime(buf, sizeof(buf), "%H:%M:%S", &t); \ + printf("%s.%ld [DEBUG] (%d): %s:%d - " fmt "\n", buf, ts.tv_nsec, pid, __FILE__, __LINE__, ##__VA_ARGS__); \ + } \ + } while (0) + +#endif // EXT_SEMIAN_UTIL_H diff --git a/lib/semian/resource.rb b/lib/semian/resource.rb index 47476a67..ffa3cc4a 100644 --- a/lib/semian/resource.rb +++ b/lib/semian/resource.rb @@ -9,9 +9,9 @@ def instance(*args) end end - def initialize(name, tickets: nil, quota: nil, permissions: 0660, timeout: 0) + def initialize(name, tickets: nil, quota: nil, permissions: 0660, timeout: 0, min_tickets: nil) if Semian.semaphores_enabled? - initialize_semaphore(name, tickets, quota, permissions, timeout) if respond_to?(:initialize_semaphore) + initialize_semaphore(name, tickets, quota, permissions, timeout, min_tickets) if respond_to?(:initialize_semaphore) else Semian.issue_disabled_semaphores_warning end diff --git a/scripts/cleanup-ipc.sh b/scripts/cleanup-ipc.sh new file mode 100755 index 00000000..b491f249 --- /dev/null +++ b/scripts/cleanup-ipc.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +ME=`whoami` + +IPCS_S=`ipcs -s | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "` +IPCS_M=`ipcs -m | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "` +IPCS_Q=`ipcs -q | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "` + +for id in $IPCS_M; do + ipcrm -m $id; +done + +for id in $IPCS_S; do + ipcrm -s $id; +done + +for id in $IPCS_Q; do + ipcrm -q $id; +done + diff --git a/test/resource_test.rb b/test/resource_test.rb index ad837852..6c213dae 100644 --- a/test/resource_test.rb +++ b/test/resource_test.rb @@ -7,6 +7,8 @@ class TestResource < Minitest::Test EPSILON = 0.1 def setup + @resources = [] + @workers = [] Semian.destroy(:testing) rescue nil @@ -490,6 +492,63 @@ def test_multiple_register_with_fork assert_equal 0, timeouts end + def test_min_tickets + id = Time.now.strftime('%H:%M:%S.%N') + resource = Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: 2) + assert_equal(1, resource.tickets) + fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true) + assert_equal(2, resource.tickets) + fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true) + assert_equal(2, resource.tickets) + fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true) + assert_equal(2, resource.tickets) + fork_workers(resource: id, count: 12, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true) + assert_equal(8, resource.tickets) + end + + def test_min_tickets_float + expected_warning = /semian min_tickets value 2\.000000 is a float, converting to fixnum/ + with_fake_std_error(warn_message: expected_warning) do + id = Time.now.strftime('%H:%M:%S.%N') + Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: 2.0) + end + end + + def test_min_tickets_nil + id = Time.now.strftime('%H:%M:%S.%N') + resource = Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: nil) + assert_equal(1, resource.tickets) + fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true) + assert_equal(1, resource.tickets) + fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true) + assert_equal(2, resource.tickets) + fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true) + assert_equal(2, resource.tickets) + fork_workers(resource: id, count: 12, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true) + assert_equal(8, resource.tickets) + end + + def test_min_tickets_zero + id = Time.now.strftime('%H:%M:%S.%N') + assert_raises ArgumentError do + Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: 0) + end + end + + def test_min_tickets_negative + id = Time.now.strftime('%H:%M:%S.%N') + assert_raises ArgumentError do + Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: -1) + end + end + + def test_min_tickets_out_of_range + id = Time.now.strftime('%H:%M:%S.%N') + assert_raises ArgumentError do + Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: 32768) + end + end + def create_resource(*args) @resources ||= [] resource = Semian::Resource.new(*args) @@ -515,14 +574,14 @@ def destroy_resources # Active workers are accumulated in the instance variable @workers, # and workers must be cleaned up between tests by the teardown script # An exit value of 100 is to keep track of timeouts, 0 for success. - def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, timeout: 0.1, wait_for_timeout: false) + def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, min_tickets: nil, timeout: 0.1, wait_for_timeout: false) fail 'Must provide at least one of tickets or quota' unless tickets || quota @workers ||= [] count.times do @workers << fork do begin - resource = Semian::Resource.new(resource.to_sym, quota: quota, tickets: tickets, timeout: timeout) + resource = Semian::Resource.new(resource.to_sym, quota: quota, tickets: tickets, min_tickets: min_tickets, timeout: timeout) Signal.trap('TERM') do yield if block_given? diff --git a/test/semian_test.rb b/test/semian_test.rb index da76d608..1d0e8e4f 100644 --- a/test/semian_test.rb +++ b/test/semian_test.rb @@ -35,7 +35,7 @@ def test_register_with_bulkhead_missing_options circuit_breaker: false, ) end - assert_equal exception.message, "Must pass exactly one of ticket or quota" + assert_equal exception.message, "Must pass exactly one of ticket or quota/min_tickets" end def test_unsuported_constants