Skip to content

Commit

Permalink
initial exported version of iqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
catern committed Sep 23, 2021
1 parent 6227bf1 commit 1f0515c
Show file tree
Hide file tree
Showing 87 changed files with 14,436 additions and 2,029 deletions.
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use_nix
41 changes: 41 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
iqueue
*.Po
*.o
*.lo
*.la
config.h.in
Makefile.in
Makefile
src/iqsync
src/iqueue
src/libiqueue.a
/aclocal.m4
/ar-lib
/autom4te.cache/
/compile
/config.h
/config.status
/configure
/depcomp
/install-sh
/missing
/stamp-h1
/build-aux
/m4
/.libs
/libtool
iqsync
/iqueue.pc
.deps
.dirstamp
/iqmod_copy
/iqmod_inplace
*.log
*.a
*.trs
/*ctest
/dump_iqueue
/in2iqueue
/iqueue2out
/wait_for_heartbeat
/iqueue_tail_count
13 changes: 13 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Copyright 2021 Two Sigma Open Source, LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
90 changes: 90 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
## Process this file with automake to produce Makefile.in
AM_CFLAGS = -g -Og -Wall -Wextra -Werror -pthread
AM_CXXFLAGS = $(AM_CFLAGS) -std=gnu++17
AM_CPPFLAGS = -I$(srcdir)/include -I$(srcdir)/src

# Library
pkgconfig_DATA = iqueue.pc
lib_LTLIBRARIES = libiqueue.la

libiqueue_la_SOURCES = src/iqueue.c src/iqsync.c \
src/math_utils.h src/math_utils.c \
src/net_utils.h src/net_utils.c \
src/proc_utils.h src/proc_utils.c \
src/io_utils.h src/io_utils.c \
src/shash.c \
src/tsgosmacs.h src/tsflexhash.h src/tsflexhash_private.h src/tsflexhash.c \
src/tslog.h src/tslog.c \
src/tsassert.h src/tsassert.c \
src/twosigma.h \
src/tsclock.h \
src/tstl.h src/tsdir.h \
src/try_unix.hh \
src/tstl.c src/tsdir.c src/getlogstr.c \
src/container_of.h \
src/tslock.h src/iqmod_common.h
include_HEADERS = include/iqueue.h include/iqsync.h include/shash.h include/iqueue.hh include/stringer.hh

AM_LDFLAGS = -lbsd
# Programs
bin_PROGRAMS = iqueue iqsync wait_for_heartbeat iqueue_tail_count iqmod_copy iqmod_inplace in2iqueue iqueue2out dump_iqueue

iqueue_SOURCES = src/iqueue-main.c
iqueue_LDADD = libiqueue.la

iqsync_SOURCES = src/iqsync-main.c
iqsync_LDADD = libiqueue.la -ldl

wait_for_heartbeat_SOURCES = src/wait_for_heartbeat.cc
wait_for_heartbeat_LDADD = libiqueue.la

iqueue_tail_count_SOURCES = src/iqueue_tail_count.cc
iqueue_tail_count_LDADD = libiqueue.la

iqmod_copy_SOURCES = src/iqmod_copy-main.c
iqmod_copy_LDADD = libiqueue.la

iqmod_inplace_SOURCES = src/iqmod_inplace-main.c
iqmod_inplace_LDADD = libiqueue.la

in2iqueue_SOURCES = src/in2iqueue.cc
in2iqueue_LDADD = libiqueue.la

iqueue2out_SOURCES = src/iqueue2out.cc
iqueue2out_LDADD = libiqueue.la

dump_iqueue_SOURCES = src/dump_iqueue.cc
dump_iqueue_LDADD = libiqueue.la

# Tests
check_LIBRARIES = libtstest.a
libtstest_a_SOURCES = test/ctest.h test/ctest_main.h test/ctest_resource.c test/ctest_resource.h

check_PROGRAMS = iqueue_reopen_ctest grow_ctest iqueue_try_update_ctest iqueue_writer_ctest \
iqueue_symlink_ctest iqueue_allocator_ctest unlink_ctest iqmod_ctest
iqueue_reopen_ctest_SOURCES = test/iqueue_reopen_ctest.c
iqueue_reopen_ctest_LDADD = libiqueue.la libtstest.a

grow_ctest_SOURCES = test/grow_ctest.c
grow_ctest_LDADD = libiqueue.la libtstest.a

iqueue_try_update_ctest_SOURCES = test/iqueue_try_update_ctest.c
iqueue_try_update_ctest_LDADD = libiqueue.la libtstest.a

iqueue_writer_ctest_SOURCES = test/iqueue_writer_ctest.c
iqueue_writer_ctest_LDADD = libiqueue.la libtstest.a

iqueue_symlink_ctest_SOURCES = test/iqueue_symlink_ctest.c
iqueue_symlink_ctest_LDADD = libiqueue.la libtstest.a

# Make sure TMPDIR is set to a place with lots of space for this one
iqueue_allocator_ctest_SOURCES = test/iqueue_allocator_ctest.c
iqueue_allocator_ctest_LDADD = libiqueue.la libtstest.a

unlink_ctest_SOURCES = test/unlink_ctest.c
unlink_ctest_LDADD = libiqueue.la libtstest.a

iqmod_ctest_SOURCES = test/iqmod_ctest.c
iqmod_ctest_LDADD = libiqueue.la libtstest.a

TESTS = $(check_PROGRAMS)
14 changes: 14 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Indexed queue.

A persistent, connectionless, message-based transport.

- Stored in a single file
- No additional processes
- No setup required beyond opening the file and beginning to read or write
- Multi-reader, each reader gets every message
- Multi-writer, writes are atomic and persistent


NOTE:

iqueue fails with a SIGBUS when it runs out of disk-space.
17 changes: 17 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
AC_INIT([iqueue], [0.1.0], [[email protected]])
AC_CONFIG_AUX_DIR([build-aux])
AC_CONFIG_MACRO_DIRS([m4])
AM_INIT_AUTOMAKE([-Wall -Werror foreign subdir-objects])
AC_PROG_CC
AC_PROG_CXX
AM_PROG_AR
LT_INIT
dnl workaround for https://github.com/kimwalisch/primesieve/issues/16
AC_SUBST(AR_FLAGS, [cr])
PKG_INSTALLDIR
AC_CONFIG_HEADERS([config.h])
AC_CONFIG_FILES([
Makefile
iqueue.pc
])
AC_OUTPUT
7 changes: 7 additions & 0 deletions default.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
with import <nixpkgs> {};

stdenv.mkDerivation rec {
name = "iqueue";
src = ./.;
buildInputs = [ microsoft_gsl autoconf automake libtool pkgconfig libbsd autoreconfHook ];
}
92 changes: 87 additions & 5 deletions include/iqsync.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
/*
* Copyright 2021 Two Sigma Open Source, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _dma_transport_iqsync_h_
#define _dma_transport_iqsync_h_

/** \file
* iqsync magic constants for both the ssh and udp versions.
*/
#include "twosigma.h"
#include <unistd.h>
#include <pthread.h>
#include <stdint.h>
#include "iqueue.h"
#include "tslock.h"


#define IQSYNC_HANDSHAKE_MAGIC 0x495148414E440005
#define IQSYNC_HANDSHAKE_MAGIC 0x495148414E440005

/** Send at the start of TCP connection or every few seconds by
* the multicast version.
Expand Down Expand Up @@ -86,30 +97,64 @@ typedef struct
} iqsync_shadow_t;


/**
* iqsync filtering function, which will be called on every outbound
* local iqueue message before it is sent to the remote side (push)
* Valid return codes are:
* 1: Forward the message
* 0: Skip the message
* -1: Stop processing futher messages and exit
*/
typedef int (*iqsync_filter_fn_t)(void *handle, const void *buf, size_t len);
typedef int (*iqsync_filter_setup_fn_t)(
const void *buf,
size_t len,
void **filter_fn_priv,
iqsync_filter_fn_t *filter_fn);

typedef struct
{
iqsync_filter_setup_fn_t filter_setup;
iqsync_filter_fn_t filter_fn;

void *filter_fn_priv;
} iqsync_filter_t;

/** Book keeping and options for the iqsync process.
* This is not the easiest structure to use for outside processes;
* it will likely have some significant rework if any other
* applications want to use the iqsync algorithm.
*/
#define DEFAULT_RECVBUFFER_SIZE (1 << 20) // 1MB
#define DEFAULT_SENDBUFFER_SIZE (1 << 12) // 4KB

struct extremely_dangerous_internal_tslock_s;
typedef struct extremely_dangerous_internal_tslock_s tslock_t;
typedef struct
{
int read_fd;
int write_fd;

int do_clone;
int do_clone_push;
int do_tail;
int do_push;
int do_pull;
int do_hdr_validate;
int do_server;
int do_prefetch;
int do_syncbehind;
int use_sendbuffer;
int use_recvbuffer;

volatile int do_shutdown;
int usleep_time;
int verbose;
int quiet;
uint64_t report_interval;
uint64_t rate_limit; // in MB/s
uint64_t avg_msg_len; // in bytes
uint64_t connection_timeout_sec;

iqueue_t * iq;
bool close_iq_on_shutdown;
Expand Down Expand Up @@ -139,11 +184,48 @@ typedef struct
pthread_t prefetch_thread;
pthread_t syncbehind_thread;

pthread_mutex_t stats_shutdown_mutex;
pthread_cond_t stats_shutdown_cond;

iqsync_shadow_t remote;
iqsync_shadow_t local;

unsigned filter_count;
iqsync_filter_t *filters;

int64_t initialization_rc;
int wait_complete;

uint32_t recvbuffer_len;
uint8_t recvbuffer_block_shift;
uint64_t recvbuffer_offset_mask;
uint32_t sendbuffer_len;
uint8_t sendbuffer_block_shift;
uint64_t sendbuffer_offset_mask;

/**
* internal buffer used to store packets
*/
volatile uint64_t recvbuffer_read_idx;
volatile uint64_t recvbuffer_write_idx;
char *recvbuffer;

uint32_t sendbuffer_data_len;
char *sendbuffer;

} iqsync_t;


int
iqsync_start_async(
iqsync_t * iqsync
);

int
iqsync_start_async_wait(
iqsync_t * iqsync
);

int
iqsync_start(
iqsync_t * iqsync
Expand Down
Loading

0 comments on commit 1f0515c

Please sign in to comment.