Skip to content

Commit

Permalink
Add Future#await
Browse files Browse the repository at this point in the history
  • Loading branch information
Watson1978 committed Nov 11, 2023
1 parent f64d3c3 commit be3131b
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 13 deletions.
35 changes: 22 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,29 @@ prepare_future = Ilios::Cassandra.session.prepare_async(<<~CQL)
CQL

prepare_future.on_success { |statement|
statement.bind({
id: 1,
message: 'Hello World',
created_at: Time.now,
})
result_future = Ilios::Cassandra.session.execute_async(statement)
result_future.on_success { |result|
p result
p "success"
}
result_future.on_failure {
p "fail"
}
futures = []

10.times do |i|
statement.bind({
id: i,
message: 'Hello World',
created_at: Time.now,
})
result_future = Ilios::Cassandra.session.execute_async(statement)
result_future.on_success { |result|
p result
p "success"
}
result_future.on_failure {
p "fail"
}

futures << result_future
end
futures.each(&:await)
}

prepare_future.await
```

## Contributing
Expand Down
17 changes: 17 additions & 0 deletions ext/ilios/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ static VALUE future_result_yielder_synchronize(VALUE arg)
} else {
future_result_failure_yield(cassandra_future);
}
uv_sem_post(&cassandra_future->sem);
return Qnil;
}

Expand Down Expand Up @@ -221,6 +222,20 @@ static VALUE future_on_failure(VALUE self)
return self;
}

static VALUE future_await(VALUE self)
{
CassandraFuture *cassandra_future;

GET_FUTURE(self, cassandra_future);

if (cassandra_future->on_success_block || cassandra_future->on_failure_block) {
nogvl_sem_wait(&cassandra_future->sem);
} else {
nogvl_future_wait(cassandra_future->future);
}
return self;
}

static void future_mark(void *ptr)
{
CassandraFuture *cassandra_future = (CassandraFuture *)ptr;
Expand All @@ -238,6 +253,7 @@ static void future_destroy(void *ptr)
if (cassandra_future->future) {
cass_future_free(cassandra_future->future);
}
uv_sem_destroy(&cassandra_future->sem);
xfree(cassandra_future);
}

Expand All @@ -263,6 +279,7 @@ void Init_future(void)

rb_define_method(cFuture, "on_success", future_on_success, 0);
rb_define_method(cFuture, "on_failure", future_on_failure, 0);
rb_define_method(cFuture, "await", future_await, 0);

future_thread_pool_init(&thread_pool_prepare);
future_thread_pool_init(&thread_pool_execute);
Expand Down
3 changes: 3 additions & 0 deletions ext/ilios/ilios.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ typedef struct
VALUE on_success_block;
VALUE on_failure_block;
VALUE proc_mutex;

uv_sem_t sem;
} CassandraFuture;

extern const rb_data_type_t cassandra_session_data_type;
Expand Down Expand Up @@ -97,6 +99,7 @@ extern void Init_future(void);
extern void nogvl_future_wait(CassFuture *future);
extern CassFuture *nogvl_session_prepare(CassSession* session, VALUE query);
extern CassFuture *nogvl_session_execute(CassSession* session, CassStatement* statement);
extern void nogvl_sem_wait(uv_sem_t *sem);

extern void statement_default_config(CassandraStatement *cassandra_statement);
extern void result_await(CassandraResult *cassandra_result);
Expand Down
13 changes: 13 additions & 0 deletions ext/ilios/nogvl.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@ CassFuture *nogvl_session_execute(CassSession* session, CassStatement* statement
nogvl_session_execute_args args = { session, statement };
return (CassFuture *)rb_thread_call_without_gvl(nogvl_session_execute_cb, &args, RUBY_UBF_PROCESS, 0);
}

static void *nogvl_sem_wait_cb(void *ptr)
{
uv_sem_t *sem = (uv_sem_t *)ptr;
uv_sem_wait(sem);
return NULL;
}

void nogvl_sem_wait(uv_sem_t *sem)
{
// Releases GVL to run another thread while waiting
rb_thread_call_without_gvl(nogvl_sem_wait_cb, sem, RUBY_UBF_PROCESS, 0);
}
2 changes: 2 additions & 0 deletions ext/ilios/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static VALUE session_prepare_async(VALUE self, VALUE query)
cassandra_future->future = prepare_future;
cassandra_future->session_obj = self;
cassandra_future->proc_mutex = rb_mutex_new();
uv_sem_init(&cassandra_future->sem, 0);

return cassandra_future_obj;
}
Expand Down Expand Up @@ -85,6 +86,7 @@ static VALUE session_execute_async(VALUE self, VALUE statement)
cassandra_future->session_obj = self;
cassandra_future->statement_obj = statement;
cassandra_future->proc_mutex = rb_mutex_new();
uv_sem_init(&cassandra_future->sem, 0);

return cassandra_future_obj;
}
Expand Down
56 changes: 56 additions & 0 deletions test/test_future.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

require_relative 'helper'

class FutureTest < Minitest::Test
def test_await
count = 0

prepare_future = Ilios::Cassandra.session.prepare_async(<<~CQL)
INSERT INTO ilios.test (
id,
tinyint,
smallint,
int,
bigint,
float,
double,
boolean,
text,
timestamp,
uuid
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
CQL

prepare_future.on_success do |statement|
futures = []

10.times do |i|
statement.bind(
{
id: i,
tinyint: i,
smallint: i,
int: i,
bigint: i,
float: i,
double: i,
boolean: true,
text: 'hello',
timestamp: Time.now,
uuid: SecureRandom.uuid
}
)
result_future = Ilios::Cassandra.session.execute_async(statement)
result_future.on_success do
count += 1
end
futures << result_future
end
futures.each(&:await)
end
prepare_future.await

assert_equal(10, count)
end
end

0 comments on commit be3131b

Please sign in to comment.