Skip to content

Commit

Permalink
Friday evening state
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wahl <[email protected]>
  • Loading branch information
Matthias Wahl committed Jun 18, 2021
1 parent 0688b8c commit 7ca34a3
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 74 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 35 additions & 34 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,28 @@ opt-level = 3
anyhow = "1"
async-channel = "1"
async-compat = "0.2"
async-compression = { version = "0.3", features = ["xz", "futures-bufread", "stream"] }
async-std = { version = "1.9.0", features = ["unstable", "attributes", "tokio03", "tokio1"] }
async-compression = {version = "0.3", features = ["xz", "futures-bufread", "stream"]}
async-std = {version = "1.9.0", features = ["unstable", "attributes", "tokio03", "tokio1"]}
async-std-resolver = "0.20"
async-tls = "0.11"
async-trait = "0.1"
async-tungstenite = { version = "0.13.1", features = ["async-std-runtime"] }
async-tungstenite = {version = "0.13.1", features = ["async-std-runtime"]}
base64 = "0.13"
beef = { version = "0.5", features = ["impl_serde"] }
beef = {version = "0.5", features = ["impl_serde"]}
bimap = "0.6"
byteorder = "1"
bytes = "1.0"
chrono = "0.4"
either = { version = "1.6", features = ["serde"] }
either = {version = "1.6", features = ["serde"]}
elastic = "0.21.0-pre.5"
error-chain = "0.12"
futures = "0.3.15"
glob = "0.3"
halfbrown = "0.1"
hashbrown = { version = "0.11", features = ["serde"] }
hashbrown = {version = "0.11", features = ["serde"]}
hostname = "0.3"
http-types = "2.11"
indexmap = { version = "1", features = ["serde-1"] }
indexmap = {version = "1", features = ["serde-1"]}
lapin = "1.7.1"
lazy_static = "1"
libflate = "1.1"
Expand All @@ -63,39 +65,38 @@ rand = "0.8"
regex = "1.4"
rental = "0.5"
rmp-serde = "0.15"
rustls = "0.19"
serde = "1"
serde_derive = "1"
serde_yaml = "0.8"
simd-json = { version = "0.4", features = ["known-key"] }
simd-json = {version = "0.4", features = ["known-key"]}
simd-json-derive = "0.2"
snap = "1"
surf = "=2.2.0"
syslog_loose = "0.10"
tremor-common = { path = "tremor-common" }
tremor-influx = { path = "tremor-influx" }
tremor-pipeline = { path = "tremor-pipeline" }
tremor-script = { path = "tremor-script" }
tremor-value = { path = "tremor-value" }
tremor-common = {path = "tremor-common"}
tremor-influx = {path = "tremor-influx"}
tremor-pipeline = {path = "tremor-pipeline"}
tremor-script = {path = "tremor-script"}
tremor-value = {path = "tremor-value"}
url = "2.2"
value-trait = "0.2"
rustls = "0.19"
async-tls = "0.11"

mapr = "0.8"
tempfile = { version = "3.2" }
tempfile = {version = "3.2"}

# blaster / blackhole
hdrhistogram = "7"
xz2 = "0.1"

# postgres
postgres = { version = "0.19", features = ["with-serde_json-1", "with-chrono-0_4"] }
postgres = {version = "0.19", features = ["with-serde_json-1", "with-chrono-0_4"]}
postgres-protocol = "0.6"
tokio-postgres = "0.7"

# kafka. cmake is the encouraged way to build this and also the one that works on windows/with musl.
rdkafka = { version = "0.26", features = ["cmake-build", "libz-static"], default-features = false }
rdkafka-sys = { version = "4.0.0", features = ["cmake-build", "libz-static"] } # tracking the version rdkafka depends on
rdkafka = {version = "0.26", features = ["cmake-build", "libz-static"], default-features = false}
rdkafka-sys = {version = "4.0.0", features = ["cmake-build", "libz-static"]}# tracking the version rdkafka depends on
smol = "1.2.5"

# crononome
Expand All @@ -106,7 +107,7 @@ grok = "1"

# not used directly in tremor codebase, but present here so that we can turn
# on features for these (see static-ssl feature here)
openssl = { version = "0.10", features = ["vendored"] }
openssl = {version = "0.10", features = ["vendored"]}

# rest onramp
tide = "0.16"
Expand All @@ -115,25 +116,25 @@ tide = "0.16"
async-nats = "0.9.18"

# discord
serenity = { version = "0.10", default-features = false, features = [
serenity = {version = "0.10", default-features = false, features = [
"client",
"gateway",
"rustls_backend",
"model",
"cache",
] }
]}

# kv
sled = "0.34"

# opentelemetry
port_scanner = "0.1.5"
tonic = { version = "0.4", default-features = false, features = ["transport", "tls"] }
tonic = {version = "0.4", default-features = false, features = ["transport", "tls"]}
tremor-otelapis = "0.1"

# gcp
googapis = { version = "0.4.2", default-features = false, features = ["google-pubsub-v1"] }
gouth = { version = "0.2" }
googapis = {version = "0.4.2", default-features = false, features = ["google-pubsub-v1"]}
gouth = {version = "0.2"}
http = "0.2.4"
reqwest = "0.11.3"

Expand All @@ -158,8 +159,8 @@ default = []
bert = ["tremor-pipeline/bert"]

[patch.crates-io]
rust-bert = { git = 'https://github.com/mfelsche/rust-bert.git', rev = '1140989' }
rust_tokenizers = { git = 'https://github.com/mfelsche/rust-tokenizers.git', rev = '5a7860d' }
rust-bert = {git = 'https://github.com/mfelsche/rust-bert.git', rev = '1140989'}
rust_tokenizers = {git = 'https://github.com/mfelsche/rust-tokenizers.git', rev = '5a7860d'}

# for use during debian packaging, via cargo-deb
# https://github.com/mmstick/cargo-deb#packagemetadatadeb-options
Expand Down Expand Up @@ -210,17 +211,17 @@ package = "tremor"
buildflags = ["--release"]
profile = "release"
[package.metadata.rpm.targets]
tremor = { path = "/usr/bin/tremor" }
tremor = {path = "/usr/bin/tremor"}
# The LHS paths here are relative to the rpm config dir (.rpm at project root).
# If you add new files here, please make sure to add them to %files section in
# the rpm spec file (inside .rpm) -- otherwise the rpm packaging will fail.
[package.metadata.rpm.files]
"../LICENSE" = { path = "/usr/share/licenses/tremor/LICENSE" }
"../README.md" = { path = "/usr/share/doc/tremor/README.md" }
"../packaging/distribution/etc/tremor/" = { path = "/etc/tremor/" }
"../LICENSE" = {path = "/usr/share/licenses/tremor/LICENSE"}
"../README.md" = {path = "/usr/share/doc/tremor/README.md"}
"../packaging/distribution/etc/tremor/" = {path = "/etc/tremor/"}
# TODO enable this after some example cleanup
#"../demo/examples/" = { path = "/etc/tremor/config/examples/" }
"../packaging/distribution/usr/share/tremor/tremor.sh" = { path = "/usr/share/tremor/tremor.sh", mode = "755" }
"../tremor-script/lib/" = { path = "/usr/share/tremor/lib/" }
"../packaging/distribution/usr/share/tremor/tremor.sh" = {path = "/usr/share/tremor/tremor.sh", mode = "755"}
"../tremor-script/lib/" = {path = "/usr/share/tremor/lib/"}
# copying systemd service to standard location for rpm packages
"../packaging/distribution/etc/systemd/system/tremor.service" = { path = "/usr/lib/systemd/system/tremor.service" }
"../packaging/distribution/etc/systemd/system/tremor.service" = {path = "/usr/lib/systemd/system/tremor.service"}
3 changes: 3 additions & 0 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl Manager {
let mut reconnect: Reconnect = Reconnect::from(create.config.reconnect);
let mut connectivity = Connectivity::Disconnected;
let mut connector_state = ConnectorState::Stopped;
dbg!(connector_state);

///// create source instance
// channel for sending SourceReply to the source part of this connector
Expand Down Expand Up @@ -234,6 +235,7 @@ impl Manager {
};
let send_addr = addr.clone();
connector_state = ConnectorState::Initialized;
dbg!(connector_state);

task::spawn::<_, Result<()>>(async move {
// typical 1 pipeline connected to IN, OUT, ERR
Expand Down Expand Up @@ -402,6 +404,7 @@ impl Manager {
}
}

#[derive(Debug)]
pub(crate) enum ConnectorState {
Initialized,
Running,
Expand Down
1 change: 1 addition & 0 deletions src/connectors/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Reconnect {
}
}
self.attempt += 1;
// TODO: trait out next interval computation, to support different strategies
self.interval_ms = (self.interval_ms as f64 * self.config.growth_rate) as u64;

// spawn retry
Expand Down
Loading

0 comments on commit 7ca34a3

Please sign in to comment.