Skip to content

Commit

Permalink
old unbuildable version of iqueue
Browse files Browse the repository at this point in the history
Interal commit 1c3eb04 in ts_dma_transport

This is added just to give some initial history, in case it's useful
to understand how and why things have changed.  This doesn't build on
its own.
  • Loading branch information
catern committed Sep 23, 2021
1 parent 4b8f20d commit 6227bf1
Show file tree
Hide file tree
Showing 8 changed files with 6,147 additions and 0 deletions.
173 changes: 173 additions & 0 deletions include/iqsync.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#ifndef _dma_transport_iqsync_h_
#define _dma_transport_iqsync_h_

/** \file
* iqsync magic constants for both the ssh and udp versions.
*/
#include "twosigma.h"
#include <unistd.h>
#include <pthread.h>
#include <stdint.h>
#include "iqueue.h"
#include "tslock.h"


#define IQSYNC_HANDSHAKE_MAGIC 0x495148414E440005

/** Send at the start of TCP connection or every few seconds by
* the multicast version.
*/
struct iqsync_handshake
{
uint64_t magic;
uint64_t creation;
uint64_t entries;
uint64_t hdr_len;
uint8_t hdr[];
} __attribute__((packed));


/** Sent as a resend request to please replay from the
* starting index. The replay will start from the specified
* index in the remote iqueue and will be sequenced with the
* desired sequence number.
*
* This is never sent to the multicast group; unicast only.
*/
#define IQSYNC_START_MAGIC 0x4951535452540003

struct iqsync_start
{
uint64_t magic;
uint64_t start_seq;
uint64_t start_index;
uint64_t flags;
} __attribute__((packed));


/** Sent at the front of each packet, either unicast or multicast */
#define IQSYNC_DATA_MAGIC 0x4951444154410004

struct iqsync_data
{
uint64_t magic;
uint64_t src; // sending source
uint64_t iq_index; // sending index
uint64_t orig_src; // original source (for cycle detection)
uint64_t orig_index; // original source
uint32_t len;
uint8_t data[];
} __attribute__((packed));


/** Update of heartbeats that have changed since the last heartbeat */
#define IQSYNC_HEARTBEAT_MAGIC 0x4951444154410005
struct iqsync_heartbeat
{
uint64_t magic_be64;
uint64_t count_be64;
shash_entry_t writers[];
} __attribute__((packed));


/** Shadow the actual values for an iqueue into a temporary for
* safe keeping.
*/
typedef struct
{
const char * name;
uint64_t index;
uint64_t creation;
uint64_t entries;
uint64_t hdr_len;
uint64_t count; // msgs received
uint64_t len; // bytes received
void * hdr;
} iqsync_shadow_t;


/** Book keeping and options for the iqsync process.
* This is not the easiest structure to use for outside processes;
* it will likely have some significant rework if any other
* applications want to use the iqsync algorithm.
*/
typedef struct
{
int read_fd;
int write_fd;

int do_clone;
int do_tail;
int do_push;
int do_pull;
int do_hdr_validate;
int do_server;
int do_prefetch;
int do_syncbehind;
volatile int do_shutdown;
int usleep_time;
int verbose;
uint64_t report_interval;
uint64_t rate_limit; // in MB/s
uint64_t avg_msg_len; // in bytes

iqueue_t * iq;
bool close_iq_on_shutdown;

shash_t * sources;
shash_entry_t * scan_index;

tslock_t * heartbeats_lock;
shash_t * heartbeats_hash;
shash_entry_t * heartbeats;
shash_entry_t * heartbeats_copy;
struct iqsync_heartbeat * heartbeat_msg;
unsigned heartbeats_max;

uint64_t start_time;
uint64_t report_time;
uint64_t report_tx_count;
uint64_t report_rx_count;
uint64_t report_rx_len;
int warned_cycle;

const char * local_cpu;
const char * remote_cpu;
pthread_t push_thread;
pthread_t pull_thread;
pthread_t stat_thread;
pthread_t prefetch_thread;
pthread_t syncbehind_thread;

iqsync_shadow_t remote;
iqsync_shadow_t local;
} iqsync_t;


int
iqsync_start(
iqsync_t * iqsync
);


/** Wait for the push/pull threads to exit.
* \note Does not close the iqsync->iq iqueue.
*/
int
iqsync_wait(
iqsync_t * iqsync
);


void
iqsync_stats(
iqsync_t * iqsync
);

const struct iqsync_data *
iqsync_data_msg(
iqueue_t * const iq,
const uint64_t offset
);

#endif
Loading

0 comments on commit 6227bf1

Please sign in to comment.