Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added min_tickets to the bulkheads #248

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
/tmp
/pkg
*.gem
*.log
*.orig
/html/
Gemfile.lock
vendor/
Expand All @@ -18,3 +20,5 @@ nohup.out

# IntelliJ/RubyMine/CLion project files
.idea
CMakeLists.txt
cmake-build-debug
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions ext/semian/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
have_func 'rb_thread_blocking_region'
have_func 'rb_thread_call_without_gvl'

$CFLAGS = "-D_GNU_SOURCE -Werror -Wall "
if ENV.key?('DEBUG')
$CFLAGS = "-D_GNU_SOURCE -Werror -Wall -std=gnu99 "
if ENV.key?('DEBUG') || ENV.key?('SEMIAN_DEBUG')
$CFLAGS << "-O0 -g -DDEBUG"
else
$CFLAGS << "-O3"
Expand Down
23 changes: 9 additions & 14 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,24 +200,19 @@ semian_resource_key(VALUE self)
}

VALUE
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout)
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout, VALUE min_tickets)
{
long c_permissions;
double c_timeout;
double c_quota;
int c_tickets;
semian_resource_t *res = NULL;
const char *c_id_str = NULL;

// Check and cast arguments
check_tickets_xor_quota_arg(tickets, quota);
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
c_quota = check_quota_arg(quota);
c_tickets = check_tickets_arg(tickets);
c_permissions = check_permissions_arg(permissions);
c_id_str = check_id_arg(id);
c_timeout = check_default_timeout_arg(default_timeout);
double c_quota = check_quota_arg(quota);
int c_tickets = check_tickets_arg(tickets);
long c_permissions = check_permissions_arg(permissions);
const char *c_id_str = check_id_arg(id);
double c_timeout = check_default_timeout_arg(default_timeout);
int c_min_tickets = check_tickets_arg(min_tickets);

// Build semian resource structure
semian_resource_t *res = NULL;
TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

// Populate struct fields
Expand All @@ -227,7 +222,7 @@ semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VAL
res->wait_time = -1;

// Initialize the semaphore set
initialize_semaphore_set(res, c_id_str, c_permissions, c_tickets, c_quota);
initialize_semaphore_set(res, c_id_str, c_permissions, c_tickets, c_min_tickets, c_quota);

return self;
}
Expand Down
2 changes: 1 addition & 1 deletion ext/semian/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int system_max_semaphore_count;
* Creates a new Resource. Do not create resources directly. Use Semian.register.
*/
VALUE
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout);
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout, VALUE min_tickets);

/*
* call-seq:
Expand Down
2 changes: 1 addition & 1 deletion ext/semian/semian.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void Init_semian()
eInternal = rb_const_get(cSemian, rb_intern("InternalError"));

rb_define_alloc_func(cResource, semian_resource_alloc);
rb_define_method(cResource, "initialize_semaphore", semian_resource_initialize, 5);
rb_define_method(cResource, "initialize_semaphore", semian_resource_initialize, 6);
rb_define_method(cResource, "acquire", semian_resource_acquire, -1);
rb_define_method(cResource, "count", semian_resource_count, 0);
rb_define_method(cResource, "semid", semian_resource_id, 0);
Expand Down
3 changes: 2 additions & 1 deletion ext/semian/sysv_semaphores.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ raise_semian_syscall_error(const char *syscall, int error_num)
}

void
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota)
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, int min_tickets, double quota)
{

res->key = generate_key(id_str);
Expand Down Expand Up @@ -69,6 +69,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis
configure_tickets_args_t configure_tickets_args = (configure_tickets_args_t){
.sem_id = res->sem_id,
.tickets = tickets,
.min_tickets = min_tickets,
.quota = quota,
};
rb_protect(
Expand Down
7 changes: 3 additions & 4 deletions ext/semian/sysv_semaphores.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and functions associated directly weth semops.

#include "types.h"
#include "tickets.h"
#include "util.h"

// Defines for ruby threading primitives
#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
Expand Down Expand Up @@ -71,7 +72,7 @@ raise_semian_syscall_error(const char *syscall, int error_num);

// Initialize the sysv semaphore structure
void
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota);
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, int min_tickets, double quota);

// Set semaphore UNIX octal permissions
void
Expand Down Expand Up @@ -106,17 +107,15 @@ get_semaphore(int key);
void *
acquire_semaphore_without_gvl(void *p);

#ifdef DEBUG
static inline void
print_sem_vals(int sem_id)
{
printf("lock %d, tickets: %d configured: %d, registered workers %d\n",
dprintf("lock %d, tickets: %d configured: %d, registered workers %d",
get_sem_val(sem_id, SI_SEM_LOCK),
get_sem_val(sem_id, SI_SEM_TICKETS),
get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS),
get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS)
);
}
#endif

#endif // SEMIAN_SEMSET_H
25 changes: 19 additions & 6 deletions ext/semian/tickets.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ static VALUE
update_ticket_count(int sem_id, int count);

static int
calculate_quota_tickets(int sem_id, double quota);
calculate_quota_tickets(int sem_id, double quota, int min_tickets);

// Must be called with the semaphore meta lock already acquired
VALUE
Expand All @@ -14,7 +14,7 @@ configure_tickets(VALUE value)
configure_tickets_args_t *args = (configure_tickets_args_t *)value;

if (args->quota > 0) {
args->tickets = calculate_quota_tickets(args->sem_id, args->quota);
args->tickets = calculate_quota_tickets(args->sem_id, args->quota, args->min_tickets);
}

/*
Expand Down Expand Up @@ -68,9 +68,22 @@ update_ticket_count(int sem_id, int tickets)
}

static int
calculate_quota_tickets (int sem_id, double quota)
min(const int a, const int b)
{
int tickets = 0;
tickets = (int) ceil(get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS) * quota);
return tickets;
return a < b ? a : b;
}

static int
max(const int a, const int b)
{
return a > b ? a : b;
}

static int
calculate_quota_tickets (int sem_id, double quota, int min_tickets)
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
{
int workers = get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS);
int tickets = (int) ceil(workers * quota);
dprintf("Calculating quota tickets - sem_id:%d quota:%0.2f%% workers:%d min_tickets:%d tickets:%d", sem_id, quota, workers, min_tickets, tickets);
return min_tickets > 0 ? min(workers, max(tickets, min_tickets)) : tickets;
thegedge marked this conversation as resolved.
Show resolved Hide resolved
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions ext/semian/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ union semun {
typedef struct {
int sem_id;
int tickets;
int min_tickets;
double quota;
} configure_tickets_args_t;

Expand Down
25 changes: 25 additions & 0 deletions ext/semian/util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef EXT_SEMIAN_UTIL_H
#define EXT_SEMIAN_UTIL_H

#include <stdarg.h>
#include <stdio.h>
#include <time.h>

#if defined(DEBUG) || defined(SEMIAN_DEBUG)
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
# define DEBUG_TEST 1
#else
# define DEBUG_TEST 0
#endif

#define dprintf(fmt, ...) \
do { \
if (DEBUG_TEST) { \
const pid_t pid = getpid(); \
struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); \
struct tm t; localtime_r(&(ts.tv_sec), &t); \
char buf[128]; strftime(buf, sizeof(buf), "%H:%M:%S", &t); \
printf("%s.%ld [DEBUG] (%d): %s:%d - " fmt "\n", buf, ts.tv_nsec, pid, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
} while (0)

#endif // EXT_SEMIAN_UTIL_H
4 changes: 2 additions & 2 deletions lib/semian/resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def instance(*args)
end
end

def initialize(name, tickets: nil, quota: nil, permissions: 0660, timeout: 0)
def initialize(name, tickets: nil, quota: nil, permissions: 0660, timeout: 0, min_tickets: 1)
if Semian.semaphores_enabled?
initialize_semaphore(name, tickets, quota, permissions, timeout) if respond_to?(:initialize_semaphore)
initialize_semaphore(name, tickets, quota, permissions, timeout, min_tickets) if respond_to?(:initialize_semaphore)
else
Semian.issue_disabled_semaphores_warning
end
Expand Down
20 changes: 20 additions & 0 deletions scripts/cleanup-ipc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

ME=`whoami`

IPCS_S=`ipcs -s | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "`
IPCS_M=`ipcs -m | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "`
IPCS_Q=`ipcs -q | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "`

for id in $IPCS_M; do
ipcrm -m $id;
done

for id in $IPCS_S; do
ipcrm -s $id;
done

for id in $IPCS_Q; do
ipcrm -q $id;
done
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also tack on an xargs for all of the above:

ipcs -s | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" " | xargs -n1 ipcrm -s
ipcs -m | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" " | xargs -n1 ipcrm -m
ipcs -q | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" " | xargs -n1 ipcrm -q


55 changes: 53 additions & 2 deletions test/resource_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class TestResource < Minitest::Test
EPSILON = 0.1

def setup
@resources = []
@workers = []
Semian.destroy(:testing)
rescue
nil
Expand Down Expand Up @@ -490,6 +492,55 @@ def test_multiple_register_with_fork
assert_equal 0, timeouts
end

def test_min_tickets
id = Time.now.strftime('%H:%M:%S.%N')
resource = Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: 2)
assert_equal(1, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 12, quota: 0.49, min_tickets: 2, timeout: 0.1, wait_for_timeout: true)
assert_equal(8, resource.tickets)
end

def test_min_tickets_nil
id = Time.now.strftime('%H:%M:%S.%N')
resource = Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: nil)
assert_equal(1, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true)
assert_equal(1, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 12, quota: 0.49, min_tickets: nil, timeout: 0.1, wait_for_timeout: true)
assert_equal(8, resource.tickets)
end

def test_min_tickets_zero
id = Time.now.strftime('%H:%M:%S.%N')
resource = Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: 0)
assert_equal(1, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 0, timeout: 0.1, wait_for_timeout: true)
assert_equal(1, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 0, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 1, quota: 0.49, min_tickets: 0, timeout: 0.1, wait_for_timeout: true)
assert_equal(2, resource.tickets)
fork_workers(resource: id, count: 12, quota: 0.49, min_tickets: 0, timeout: 0.1, wait_for_timeout: true)
assert_equal(8, resource.tickets)
end

def test_min_tickets_negative
id = Time.now.strftime('%H:%M:%S.%N')
assert_raises ArgumentError do
Semian::Resource.new(id, quota: 0.49, timeout: 0.1, min_tickets: -1)
end
end

def create_resource(*args)
@resources ||= []
resource = Semian::Resource.new(*args)
Expand All @@ -515,14 +566,14 @@ def destroy_resources
# Active workers are accumulated in the instance variable @workers,
# and workers must be cleaned up between tests by the teardown script
# An exit value of 100 is to keep track of timeouts, 0 for success.
def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, timeout: 0.1, wait_for_timeout: false)
def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, min_tickets: nil, timeout: 0.1, wait_for_timeout: false)
fail 'Must provide at least one of tickets or quota' unless tickets || quota

@workers ||= []
count.times do
@workers << fork do
begin
resource = Semian::Resource.new(resource.to_sym, quota: quota, tickets: tickets, timeout: timeout)
resource = Semian::Resource.new(resource.to_sym, quota: quota, tickets: tickets, min_tickets: min_tickets, timeout: timeout)

Signal.trap('TERM') do
yield if block_given?
Expand Down