Skip to content

Commit

Permalink
Merge pull request #535 from cole-miller/revamp
Browse files Browse the repository at this point in the history
Start of work on dqlite revamp
  • Loading branch information
Mathieu Borderé authored Oct 17, 2023
2 parents d16ac97 + 3d41a8a commit b437252
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 30 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ libdqlite_la_SOURCES = \
src/registry.c \
src/request.c \
src/response.c \
src/revamp.c \
src/roles.c \
src/server.c \
src/stmt.c \
Expand Down
72 changes: 57 additions & 15 deletions src/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "tuple.h"

/* Bind a single parameter. */
static int bind_one(sqlite3_stmt *stmt, int n, struct value *value)
static int bind_one(sqlite3_stmt *stmt, int n, const struct value *value)
{
int rc;

Expand Down Expand Up @@ -48,37 +48,79 @@ static int bind_one(sqlite3_stmt *stmt, int n, struct value *value)
return rc;
}

int bind__params(sqlite3_stmt *stmt, struct cursor *cursor, int format)
int parseParams(struct cursor *cursor, int format, struct value **out)
{
struct tuple_decoder decoder;
struct value *head;
struct value *prev;
unsigned long i;
int rc;
int rv;

assert(format == TUPLE__PARAMS || format == TUPLE__PARAMS32);

sqlite3_reset(stmt);

/* If the payload has been fully consumed, it means there are no
* parameters to bind. */
if (cursor->cap == 0) {
return 0;
}

rc = tuple_decoder__init(&decoder, 0, format, cursor);
if (rc != 0) {
return rc;
rv = tuple_decoder__init(&decoder, 0, format, cursor);
if (rv != 0) {
goto err;
}

head = sqlite3_malloc(sizeof(*head));
if (head == NULL) {
rv = DQLITE_NOMEM;
goto err;
}
prev = head;
for (i = 0; i < tuple_decoder__n(&decoder); i++) {
struct value value;
rc = tuple_decoder__next(&decoder, &value);
if (rc != 0) {
return rc;
prev->next = sqlite3_malloc(sizeof(*prev->next));
if (prev->next == NULL) {
goto err_after_alloc_head;
}
rc = bind_one(stmt, (int)(i + 1), &value);
if (rc != 0) {
return rc;
rv = tuple_decoder__next(&decoder, prev->next);
if (rv != 0) {
goto err_after_alloc_head;
}
prev = prev->next;
}

*out = head;
return 0;

err_after_alloc_head:
freeParams(head);
err:
return rv;
}

int bindParams(sqlite3_stmt *stmt, const struct value *params)
{
int i;
int rv;

i = 1;
for (const struct value *cur = params; cur != NULL; cur = cur->next) {
rv = bind_one(stmt, i, cur);
if (rv != 0) {
return rv;
}
i += 1;
}
return 0;
}

void freeParams(struct value *params)
{
struct value *cur;
struct value *old;

cur = params;
while (cur != NULL) {
old = cur;
cur = old->next;
sqlite3_free(old);
}
}
11 changes: 7 additions & 4 deletions src/bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

#include "lib/serialize.h"

/**
* Bind the parameters of the given statement by decoding the given payload.
*/
int bind__params(sqlite3_stmt *stmt, struct cursor *cursor, int format);
struct value;

int parseParams(struct cursor *cursor, int format, struct value **out);

int bindParams(sqlite3_stmt *stmt, const struct value *params);

void freeParams(struct value *params);

#endif /* BIND_H_*/
6 changes: 4 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "message.h"
#include "protocol.h"
#include "request.h"
#include "revamp.h"
#include "tracing.h"
#include "transport.h"

Expand Down Expand Up @@ -295,7 +296,8 @@ int conn__start(struct conn *c,
struct uv_stream_s *stream,
struct raft_uv_transport *uv_transport,
struct id_state seed,
conn_close_cb close_cb)
conn_close_cb close_cb,
struct db_context *db_ctx)
{
int rv;
(void)loop;
Expand All @@ -309,7 +311,7 @@ int conn__start(struct conn *c,
c->transport.data = c;
c->uv_transport = uv_transport;
c->close_cb = close_cb;
gateway__init(&c->gateway, config, registry, raft, seed);
gateway__init(&c->gateway, config, registry, raft, seed, db_ctx);
rv = buffer__init(&c->read);
if (rv != 0) {
goto err_after_transport_init;
Expand Down
4 changes: 3 additions & 1 deletion src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "gateway.h"
#include "id.h"
#include "message.h"
#include "revamp.h"

/**
* Callbacks.
Expand Down Expand Up @@ -52,7 +53,8 @@ int conn__start(struct conn *c,
struct uv_stream_s *stream,
struct raft_uv_transport *uv_transport,
struct id_state seed,
conn_close_cb close_cb);
conn_close_cb close_cb,
struct db_context *db_ctx);

/**
* Force closing the connection. The close callback will be invoked when it's
Expand Down
48 changes: 42 additions & 6 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ void gateway__init(struct gateway *g,
struct config *config,
struct registry *registry,
struct raft *raft,
struct id_state seed)
struct id_state seed,
struct db_context *db_ctx)
{
tracef("gateway init");
g->config = config;
Expand All @@ -31,6 +32,7 @@ void gateway__init(struct gateway *g,
g->protocol = DQLITE_PROTOCOL_VERSION;
g->client_id = 0;
g->random_state = seed;
g->db_ctx = db_ctx;
}

void gateway__leader_close(struct gateway *g, int reason)
Expand Down Expand Up @@ -465,6 +467,7 @@ static int handle_exec(struct gateway *g, struct handle *req)
struct request_exec request = {0};
int tuple_format;
uint64_t req_id;
struct value *params;
int rv;

switch (req->schema) {
Expand All @@ -491,12 +494,20 @@ static int handle_exec(struct gateway *g, struct handle *req)
LOOKUP_DB(request.db_id);
LOOKUP_STMT(request.stmt_id);
FAIL_IF_CHECKPOINTING;
rv = bind__params(stmt->stmt, cursor, tuple_format);
rv = parseParams(cursor, tuple_format, &params);
if (rv != 0) {
tracef("handle exec parse params failed %d", rv);
failure(req, rv, "parse parameters");
return 0;
}
rv = bindParams(stmt->stmt, params);
if (rv != 0) {
tracef("handle exec bind failed %d", rv);
freeParams(params);
failure(req, rv, "bind parameters");
return 0;
}
freeParams(params);
req->stmt_id = stmt->id;
g->req = req;
req_id = idNext(&g->random_state);
Expand Down Expand Up @@ -595,6 +606,7 @@ static int handle_query(struct gateway *g, struct handle *req)
int tuple_format;
bool is_readonly;
uint64_t req_id;
struct value *params;
int rv;

switch (req->schema) {
Expand All @@ -621,12 +633,20 @@ static int handle_query(struct gateway *g, struct handle *req)
LOOKUP_DB(request.db_id);
LOOKUP_STMT(request.stmt_id);
FAIL_IF_CHECKPOINTING;
rv = bind__params(stmt->stmt, cursor, tuple_format);
rv = parseParams(cursor, tuple_format, &params);
if (rv != 0) {
tracef("handle query parse params failed %d", rv);
failure(req, rv, "bind parameters");
return 0;
}
rv = bindParams(stmt->stmt, params);
if (rv != 0) {
tracef("handle query bind failed %d", rv);
freeParams(params);
failure(req, rv, "bind parameters");
return 0;
}
freeParams(params);
req->stmt_id = stmt->id;
g->req = req;

Expand Down Expand Up @@ -697,6 +717,7 @@ static void handle_exec_sql_next(struct gateway *g,
const char *tail;
int tuple_format;
uint64_t req_id;
struct value *params;
int rv;

if (req->sql == NULL || strcmp(req->sql, "") == 0) {
Expand Down Expand Up @@ -728,8 +749,14 @@ static void handle_exec_sql_next(struct gateway *g,
/* Should have been caught by handle_exec_sql */
assert(0);
}
rv = bind__params(stmt, cursor, tuple_format);
if (rv != SQLITE_OK) {
rv = parseParams(cursor, tuple_format, &params);
if (rv != 0) {
failure(req, rv, "parse parameters");
goto done_after_prepare;
}
rv = bindParams(stmt, params);
if (rv != 0) {
freeParams(params);
failure(req, rv, "bind parameters");
goto done_after_prepare;
}
Expand Down Expand Up @@ -847,6 +874,7 @@ static void querySqlBarrierCb(struct barrier *barrier, int status)
int tuple_format;
bool is_readonly;
uint64_t req_id;
struct value *params;
int rv;

if (status != 0) {
Expand Down Expand Up @@ -886,9 +914,17 @@ static void querySqlBarrierCb(struct barrier *barrier, int status)
/* Should have been caught by handle_query_sql */
assert(0);
}
rv = bind__params(stmt, cursor, tuple_format);
rv = parseParams(cursor, tuple_format, &params);
if (rv != 0) {
tracef("handle query sql parse params failed %d", rv);
sqlite3_finalize(stmt);
failure(req, rv, "parse parameters");
return;
}
rv = bindParams(stmt, params);
if (rv != 0) {
tracef("handle query sql bind failed %d", rv);
freeParams(params);
sqlite3_finalize(stmt);
failure(req, rv, "bind parameters");
return;
Expand Down
4 changes: 3 additions & 1 deletion src/gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ struct gateway
uint64_t protocol; /* Protocol format version */
uint64_t client_id;
struct id_state random_state; /* For generating IDs */
struct db_context *db_ctx;
};

void gateway__init(struct gateway *g,
struct config *config,
struct registry *registry,
struct raft *raft,
struct id_state seed);
struct id_state seed,
struct db_context *db_ctx);

void gateway__close(struct gateway *g);

Expand Down
1 change: 1 addition & 0 deletions src/revamp.c
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "revamp.h"
11 changes: 11 additions & 0 deletions src/revamp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef DQLITE_REVAMP_H
#define DQLITE_REVAMP_H

#include <semaphore.h>

struct db_context
{
sem_t sem;
};

#endif
24 changes: 23 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ void dqlite__close(struct dqlite_node *d)
if (!d->initialized) {
return;
}
sem_destroy(&d->db_ctx->sem);
free(d->db_ctx);
raft_free(d->listener);
rv = sem_destroy(&d->stopped);
assert(rv == 0); /* Fails only if sem object is not valid */
Expand Down Expand Up @@ -526,6 +528,9 @@ static void stopCb(uv_async_t *stop)
conn__stop(conn);
}
raft_close(&d->raft, raftCloseCb);

sem_post(&d->db_ctx->sem);
pthread_join(d->db_thread, NULL);
}

/* Callback invoked as soon as the loop as started.
Expand Down Expand Up @@ -624,7 +629,7 @@ static void listenCb(uv_stream_t *listener, int status)
goto err;
}
rv = conn__start(conn, &t->config, &t->loop, &t->registry, &t->raft,
stream, &t->raft_transport, seed, destroy_conn);
stream, &t->raft_transport, seed, destroy_conn, t->db_ctx);
if (rv != 0) {
goto err_after_conn_alloc;
}
Expand Down Expand Up @@ -671,10 +676,27 @@ static void roleManagementTimerCb(uv_timer_t *handle)
RolesAdjust(d);
}

static void *dbTask(void *arg)
{
struct db_context *ctx = arg;
sem_wait(&ctx->sem);
return NULL;
}

static int taskRun(struct dqlite_node *d)
{
int rv;

d->db_ctx = malloc(sizeof *d->db_ctx);
if (d->db_ctx == NULL) {
return DQLITE_NOMEM;
}
rv = sem_init(&d->db_ctx->sem, 0, 0);
assert(rv == 0);

rv = pthread_create(&d->db_thread, NULL, dbTask, d->db_ctx);
assert(rv == 0);

/* TODO: implement proper cleanup upon error by spinning the loop a few
* times. */
assert(d->listener != NULL);
Expand Down
Loading

0 comments on commit b437252

Please sign in to comment.