Skip to content

Commit

Permalink
Merge pull request #538 from cole-miller/revamp
Browse files Browse the repository at this point in the history
More revamp work: preparing to send requests to the DB thread
  • Loading branch information
Mathieu Borderé authored Oct 18, 2023
2 parents a8ca486 + f638fd7 commit 65925a2
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 17 deletions.
48 changes: 48 additions & 0 deletions src/revamp.c
Original file line number Diff line number Diff line change
@@ -1 +1,49 @@
#include "revamp.h"

int dbContextInit(struct db_context *ctx, struct config *config)
{
int rv;
rv = pthread_mutex_init(&ctx->mutex, NULL);
assert(rv == 0);
rv = pthread_cond_init(&ctx->cond, NULL);
assert(rv == 0);
registry__init(&ctx->registry, config);
ctx->shutdown = false;
return 0;
}

int postExecSqlReq(struct db_context *ctx,
struct exec_sql_req req,
void *data,
void (*cb)(struct exec_sql_req, int, void *))
{
(void)ctx;
(void)req;
(void)data;
(void)cb;
return 0;
}

void dbContextClose(struct db_context *ctx)
{
registry__close(&ctx->registry);
pthread_cond_destroy(&ctx->cond);
pthread_mutex_destroy(&ctx->mutex);
}

void *dbTask(void *arg)
{
struct db_context *ctx = arg;
int rv;

rv = pthread_mutex_lock(&ctx->mutex);
assert(rv == 0);
for (;;) {
rv = pthread_cond_wait(&ctx->cond, &ctx->mutex);
assert(rv == 0);
if (ctx->shutdown) {
break;
}
}
return NULL;
}
36 changes: 32 additions & 4 deletions src/revamp.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,39 @@
#ifndef DQLITE_REVAMP_H
#define DQLITE_REVAMP_H
#ifndef DQLITE_REVAMP_H_
#define DQLITE_REVAMP_H_

#include <semaphore.h>
#include "lib/queue.h"
#include "registry.h"
#include "tuple.h"

#include <pthread.h>
#include <stdbool.h>

struct db_context
{
sem_t sem;
pthread_mutex_t mutex;
pthread_cond_t cond;
struct registry registry;
queue exec_sql_reqs;
bool shutdown;
};

struct exec_sql_req
{
char *db_name;
char *sql;
struct value *params;
queue queue;
};

int dbContextInit(struct db_context *ctx, struct config *config);

int postExecSqlReq(struct db_context *ctx,
struct exec_sql_req req,
void *data,
void (*cb)(struct exec_sql_req, int, void *));

void dbContextClose(struct db_context *ctx);

void *dbTask(void *arg);

#endif
31 changes: 18 additions & 13 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "lib/fs.h"
#include "logger.h"
#include "protocol.h"
#include "revamp.h"
#include "roles.h"
#include "tracing.h"
#include "translate.h"
Expand Down Expand Up @@ -167,8 +168,8 @@ void dqlite__close(struct dqlite_node *d)
if (!d->initialized) {
return;
}
sem_destroy(&d->db_ctx->sem);
free(d->db_ctx);
dbContextClose(d->db_ctx);
sqlite3_free(d->db_ctx);
raft_free(d->listener);
rv = sem_destroy(&d->stopped);
assert(rv == 0); /* Fails only if sem object is not valid */
Expand Down Expand Up @@ -529,7 +530,10 @@ static void stopCb(uv_async_t *stop)
}
raft_close(&d->raft, raftCloseCb);

sem_post(&d->db_ctx->sem);
pthread_mutex_lock(&d->db_ctx->mutex);
d->db_ctx->shutdown = true;
pthread_cond_signal(&d->db_ctx->cond);
pthread_mutex_unlock(&d->db_ctx->mutex);
pthread_join(d->db_thread, NULL);
}

Expand Down Expand Up @@ -676,24 +680,22 @@ static void roleManagementTimerCb(uv_timer_t *handle)
RolesAdjust(d);
}

static void *dbTask(void *arg)
{
struct db_context *ctx = arg;
sem_wait(&ctx->sem);
return NULL;
}

static int taskRun(struct dqlite_node *d)
{
int rv;

d->db_ctx = malloc(sizeof *d->db_ctx);
d->db_ctx = sqlite3_malloc(sizeof *d->db_ctx);
if (d->db_ctx == NULL) {
return DQLITE_NOMEM;
}
rv = sem_init(&d->db_ctx->sem, 0, 0);
assert(rv == 0);
rv = dbContextInit(d->db_ctx, &d->config);
if (rv != 0) {
sqlite3_free(d->db_ctx);
return rv;
}

rv = pthread_mutex_lock(&d->db_ctx->mutex);
assert(rv == 0);
rv = pthread_create(&d->db_thread, NULL, dbTask, d->db_ctx);
assert(rv == 0);

Expand Down Expand Up @@ -751,6 +753,9 @@ static int taskRun(struct dqlite_node *d)
return rv;
}

rv = pthread_mutex_unlock(&d->db_ctx->mutex);
assert(rv == 0);

rv = uv_run(&d->loop, UV_RUN_DEFAULT);
assert(rv == 0);

Expand Down

0 comments on commit 65925a2

Please sign in to comment.