Skip to content

Commit

Permalink
API for creating task specifications (#5)
Browse files Browse the repository at this point in the history
* API for creating task specifications

* fixes

* add more checks and improve comments
  • Loading branch information
pcmoritz authored and robertnishihara committed Sep 17, 2016
1 parent 73f4b96 commit 0b7d81c
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 2 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ $(BUILD)/db_tests: hiredis test/db_tests.c thirdparty/greatest.h event_loop.c st
$(BUILD)/socket_tests: test/socket_tests.c thirdparty/greatest.h sockets.c
$(CC) -o $@ test/socket_tests.c sockets.c $(CFLAGS) -I. -Ithirdparty

$(BUILD)/task_tests: test/task_tests.c task.c sockets.c common.h
$(CC) -o $@ test/task_tests.c task.c sockets.c $(CFLAGS) -I. -Ithirdparty

clean:
rm -r $(BUILD)/*

Expand All @@ -21,8 +24,8 @@ redis:
hiredis:
git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make

test: hiredis redis $(BUILD)/db_tests $(BUILD)/socket_tests FORCE
test: hiredis redis $(BUILD)/db_tests $(BUILD)/socket_tests $(BUILD)/task_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server &
sleep 1s ; ./build/db_tests ; ./build/socket_tests
sleep 1s ; ./build/db_tests ; ./build/socket_tests ; ./build/task_tests

FORCE:
8 changes: 8 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)

#define CHECK(COND) \
do { \
if (!(COND)) { \
LOG_ERR("Check failure: %s", #COND); \
exit(-1); \
} \
} while (0);

#define UNIQUE_ID_SIZE 20

typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;
Expand Down
133 changes: 133 additions & 0 deletions task.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include "task.h"
#include "common.h"
#include "sockets.h"

/* Tasks are stored in a consecutive chunk of memory, the first
* sizeof(task_spec) bytes are arranged according to the struct
* task_spec. Then there is an array of task_args of length
* (num_args + num_returns), and then follows the data of
* pass-by-value arguments of size args_value_size. The offsets in the
* task_arg.val are with respect to the end of the augmented structure,
* i.e. with respect to the address &task_spec.args_and_returns[0] +
* (task_spec->num_args + task_spec->num_returns) * sizeof(task_arg). */

typedef struct {
/* Either ARG_BY_REF or ARG_BY_VAL. */
int8_t type;
union {
object_id obj_id;
struct {
/* Offset where the data associated to this arg is located relative
* to &task_spec.args_and_returns[0]. */
ptrdiff_t offset;
int64_t length;
} value;
};
} task_arg;

struct task_spec_impl {
function_id func_id;
/* Total number of arguments. */
int64_t num_args;
/* Index of the last argument that has been constructed. */
int64_t arg_index;
/* Number of return values. */
int64_t num_returns;
/* Number of bytes the pass-by-value arguments are occupying. */
int64_t args_value_size;
/* The offset of the number of bytes of pass-by-value data that
* has been written so far, relative to &task_spec->args_and_returns[0] +
* (task_spec->num_args + task_spec->num_returns) * sizeof(task_arg) */
int64_t args_value_offset;
/* Argument and return IDs as well as offsets for pass-by-value args. */
task_arg args_and_returns[0];
};

task_spec *alloc_task_spec(function_id func_id,
int64_t num_args,
int64_t num_returns,
int64_t args_value_size) {
int64_t size = sizeof(task_spec) +
(num_args + num_returns) * sizeof(task_arg) + args_value_size;
task_spec *task = malloc(size);
memset(task, 0, size);
task->func_id = func_id;
task->num_args = num_args;
task->arg_index = 0;
task->num_returns = num_returns;
task->args_value_size = args_value_size;
return task;
}

int64_t task_num_args(task_spec *spec) {
return spec->num_args;
}

int64_t task_num_returns(task_spec *spec) {
return spec->num_returns;
}

int8_t task_arg_type(task_spec *spec, int64_t arg_index) {
CHECK(0 <= arg_index && arg_index < spec->num_args);
return spec->args_and_returns[arg_index].type;
}

object_id *task_arg_id(task_spec *spec, int64_t arg_index) {
CHECK(0 <= arg_index && arg_index < spec->num_args);
task_arg *arg = &spec->args_and_returns[arg_index];
CHECK(arg->type == ARG_BY_REF)
return &arg->obj_id;
}

uint8_t *task_arg_val(task_spec *spec, int64_t arg_index) {
CHECK(0 <= arg_index && arg_index < spec->num_args);
task_arg *arg = &spec->args_and_returns[arg_index];
CHECK(arg->type == ARG_BY_VAL);
uint8_t *data = (uint8_t *) &spec->args_and_returns[0];
data += (spec->num_args + spec->num_returns) * sizeof(task_arg);
return data + arg->value.offset;
}

int64_t task_arg_length(task_spec *spec, int64_t arg_index) {
CHECK(0 <= arg_index && arg_index < spec->num_args);
task_arg *arg = &spec->args_and_returns[arg_index];
CHECK(arg->type == ARG_BY_VAL);
return arg->value.length;
}

int64_t task_args_add_ref(task_spec *spec, object_id obj_id) {
task_arg *arg = &spec->args_and_returns[spec->arg_index];
arg->type = ARG_BY_REF;
arg->obj_id = obj_id;
return spec->arg_index++;
}

int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length) {
task_arg *arg = &spec->args_and_returns[spec->arg_index];
arg->type = ARG_BY_VAL;
arg->value.offset = spec->args_value_offset;
arg->value.length = length;
uint8_t *addr = task_arg_val(spec, spec->arg_index);
CHECK(spec->args_value_offset + length <= spec->args_value_size);
CHECK(spec->arg_index != spec->num_args - 1 ||
spec->args_value_offset + length == spec->args_value_size);
memcpy(addr, data, length);
spec->args_value_offset += length;
return spec->arg_index++;
}

object_id *task_return(task_spec *spec, int64_t ret_index) {
CHECK(0 <= ret_index && ret_index < spec->num_returns);
task_arg *ret = &spec->args_and_returns[spec->num_args + ret_index];
CHECK(ret->type == ARG_BY_REF); /* No memory corruption. */
return &ret->obj_id;
}

void free_task_spec(task_spec *spec) {
CHECK(spec->arg_index == spec->num_args); /* Task was fully constructed */
free(spec);
}
53 changes: 53 additions & 0 deletions task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* This API specifies the task data structure. It is in C so we can
* easily construct tasks from other languages like Python. The datastructures
* are also defined in such a way that memory is contiguous and all pointers
* are relative, so that we can memcpy the datastructure and ship it over the
* network without serialization and deserialization. */

#include <stddef.h>
#include <stdint.h>
#include "common.h"

typedef unique_id function_id;
typedef unique_id object_id;

typedef struct task_spec_impl task_spec;

/* If argument is passed by value or reference. */
enum arg_type { ARG_BY_REF, ARG_BY_VAL };

/* Construct and modify task specifications. */

/* Allocating and initializing a task. */
task_spec *alloc_task_spec(function_id func_id,
int64_t num_args,
int64_t num_returns,
int64_t args_value_size);

/* Getting the number of arguments and returns. */
int64_t task_num_args(task_spec *spec);
int64_t task_num_returns(task_spec *spec);

/* Getting task arguments. */
int8_t task_arg_type(task_spec *spec, int64_t arg_index);
unique_id *task_arg_id(task_spec *spec, int64_t arg_index);
uint8_t *task_arg_val(task_spec *spec, int64_t arg_index);
int64_t task_arg_length(task_spec *spec, int64_t arg_index);

/* Setting task arguments. Note that this API only allows you to set the
* arguments in their order of appearance. */
int64_t task_args_add_ref(task_spec *spec, object_id obj_id);
int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length);

/* Getting and setting return arguments. Tasks return by reference for now. */
unique_id *task_return(task_spec *spec, int64_t ret_index);

/* Freeing the task datastructure. */
void free_task_spec(task_spec *spec);

/* Write the task specification to a file or socket. */
int send_task(int fd, task_spec *spec);

/* Read the task specification from a file or socket. It is the user's
* responsibility to free the task after it has been used. */
task_spec *recv_task(int fd);
54 changes: 54 additions & 0 deletions test/task_tests.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "greatest.h"

#include "task.h"

SUITE(task_tests);

TEST task_test(void) {
function_id func_id = {
{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}};
task_spec* task = alloc_task_spec(func_id, 4, 2, 10);
ASSERT(task_num_args(task) == 4);
ASSERT(task_num_returns(task) == 2);

unique_id arg1 = {
{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}};
ASSERT(task_args_add_ref(task, arg1) == 0);
ASSERT(task_args_add_val(task, (uint8_t*) "hello", 5) == 1);
unique_id arg2 = {
{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}};
ASSERT(task_args_add_ref(task, arg2) == 2);
ASSERT(task_args_add_val(task, (uint8_t*) "world", 5) == 3);

unique_id ret0 = {
{4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4}};
unique_id ret1 = {
{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5}};
memcpy(task_return(task, 0), &ret0, sizeof(ret0));
memcpy(task_return(task, 1), &ret1, sizeof(ret1));

ASSERT(memcmp(task_arg_id(task, 0), &arg1, sizeof(arg1)) == 0);
ASSERT(memcmp(task_arg_val(task, 1), (uint8_t*) "hello",
task_arg_length(task, 1)) == 0);
ASSERT(memcmp(task_arg_id(task, 2), &arg2, sizeof(arg2)) == 0);
ASSERT(memcmp(task_arg_val(task, 3), (uint8_t*) "world",
task_arg_length(task, 3)) == 0);

ASSERT(memcmp(task_return(task, 0), &ret0, sizeof(unique_id)) == 0);
ASSERT(memcmp(task_return(task, 1), &ret1, sizeof(unique_id)) == 0);

free_task_spec(task);
PASS();
}

SUITE(task_tests) {
RUN_TEST(task_test);
}

GREATEST_MAIN_DEFS();

int main(int argc, char** argv) {
GREATEST_MAIN_BEGIN();
RUN_SUITE(task_tests);
GREATEST_MAIN_END();
}

0 comments on commit 0b7d81c

Please sign in to comment.