Skip to content

Commit

Permalink
Add std_streams connector and some more fixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wahl <[email protected]>
  • Loading branch information
Matthias Wahl committed Aug 18, 2021
1 parent 185a121 commit 2885548
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 9 deletions.
6 changes: 6 additions & 0 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub(crate) mod pb;
/// tcp server connector impl
pub(crate) mod tcp_server;

/// std streams connector (stdout, stderr, stdin)
pub(crate) mod std_streams;

/// Home of the famous metrics collector
pub(crate) mod metrics;

Expand Down Expand Up @@ -736,5 +739,8 @@ pub async fn register_builtin_connectors(world: &World) -> Result<()> {
world
.register_builtin_connector_type("tcp_server", Box::new(tcp_server::Builder {}))
.await?;
world
.register_builtin_connector_type("std_stream", Box::new(std_streams::Builder {}))
.await?;
Ok(())
}
10 changes: 8 additions & 2 deletions src/connectors/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use crate::connectors::sink::{Sink, SinkContext, SinkData};
pub use crate::connectors::source::{Source, SourceContext, SourceReply};
pub use crate::connectors::reconnect::ConnectionLostNotifier;
pub use crate::connectors::sink::{
EventSerializer, ResultVec, Sink, SinkAddr, SinkContext, SinkData, SinkManagerBuilder,
};
pub use crate::connectors::source::{
Source, SourceAddr, SourceContext, SourceManagerBuilder, SourceReply,
};
pub use crate::connectors::{Connector, ConnectorBuilder, ConnectorContext, ConnectorState};
pub use crate::errors::{Error, ErrorKind, Result};
pub use crate::url::TremorUrl;
pub use crate::OpConfig;
pub use tremor_pipeline::ConfigImpl;
1 change: 1 addition & 0 deletions src/connectors/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ where
);
self.pipelines.retain(|(url, _)| url != &id);
}
// FIXME: only handle those if in the right state (see source part)
SinkMsg::Start => self.sink.on_start(&mut self.ctx).await,
SinkMsg::Resume => {
self.paused = false;
Expand Down
3 changes: 2 additions & 1 deletion src/connectors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,8 @@ where
}
Err(e) => {
warn!("[Source::{}] Error pulling data: {}", &self.ctx.url, e);
// TODO: increment metrics err
// TODO: increment error metric
// FIXME: emit event to err port
}
}
pull_counter += 1;
Expand Down
221 changes: 221 additions & 0 deletions src/connectors/std_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2021, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::connectors::prelude::*;
use crate::utils::hostname;
use async_std::io::{stderr, stdin, stdout, ReadExt, Stderr, Stdin, Stdout, Write};
use futures::AsyncWriteExt;
use tremor_pipeline::{EventOriginUri, DEFAULT_STREAM_ID};

#[derive(Deserialize, Debug, Clone)]
pub enum StdStream {
Stdout,
Stderr,
Stdin,
}

#[derive(Deserialize, Debug, Clone)]
pub struct Config {
stream: StdStream,
#[serde(default = "Default::default")]
prefix: String,
/// print non-string payloads as raw bytes, not in debug formatting
#[serde(default = "Default::default")]
raw: bool,
}

impl ConfigImpl for Config {}

/// connector handling 1 std stream (stdout, stderr or stdin)
pub struct StdStreamConnector {
stream: StdStream,
prefix: String,
raw: bool,
}

pub(crate) struct Builder {}
impl ConnectorBuilder for Builder {
fn from_config(
&self,
_id: &TremorUrl,
raw_config: &Option<OpConfig>,
) -> Result<Box<dyn Connector>> {
if let Some(raw) = raw_config {
let config = Config::new(raw)?;
Ok(Box::new(StdStreamConnector {
stream: config.stream,
prefix: config.prefix,
raw: config.raw,
}))
} else {
Err(ErrorKind::MissingConfiguration(String::from("std_stream")).into())
}
}
}

/// stdstream source (stdin)
pub struct StdStreamSource {
stream: Stdin,
buffer: Vec<u8>,
origin_uri: EventOriginUri,
}

impl StdStreamSource {
const INPUT_SIZE_BYTES: usize = 8192;

fn new() -> Self {
Self {
stream: stdin(),
buffer: vec![0; Self::INPUT_SIZE_BYTES],
origin_uri: EventOriginUri {
scheme: "tremor-stdin".to_string(),
host: hostname(),
port: None,
path: vec![],
},
}
}
}

#[async_trait::async_trait()]
impl Source for StdStreamSource {
async fn pull_data(&mut self, _pull_id: u64, _ctx: &SourceContext) -> Result<SourceReply> {
let len = self.stream.read(&mut self.buffer).await?;
if len == 0 {
// reached the end of stdin
// FIXME: initiate state change to stop this source
Ok(SourceReply::Empty(1000))
} else {
Ok(SourceReply::Data {
origin_uri: self.origin_uri.clone(),
// ALLOW: len cannot be > INPUT_SIZE_BYTES
data: self.buffer[0..len].to_vec(),
meta: None,
stream: DEFAULT_STREAM_ID,
})
}
}

fn is_transactional(&self) -> bool {
false
}
}

/// stdstream sink
pub struct StdStreamSink<T>
where
T: Write + std::marker::Unpin + Send,
{
stream: T,
prefix: String,
raw: bool,
}

#[async_trait::async_trait()]
impl<T> Sink for StdStreamSink<T>
where
T: Write + std::marker::Unpin + Send,
{
async fn on_event(
&mut self,
_input: &str,
event: tremor_pipeline::Event,
_ctx: &SinkContext,
serializer: &mut EventSerializer,
_start: u64,
) -> ResultVec {
for (value, _meta) in event.value_meta_iter() {
let data = serializer.serialize(value, event.ingest_ns)?;
for chunk in data {
self.stream.write_all(self.prefix.as_bytes()).await?;
if self.raw {
self.stream.write_all(&chunk).await?;
} else if let Ok(s) = std::str::from_utf8(&chunk) {
self.stream.write_all(s.as_bytes()).await?;
} else {
self.stream
.write_all(format!("{:?}", &chunk).as_bytes())
.await?
}
self.stream.write_all(b"\n").await?
}
}
self.stream.flush().await?;
Ok(vec![])
}

fn auto_ack(&self) -> bool {
true
}
}

#[async_trait::async_trait()]
impl Connector for StdStreamConnector {
async fn connect(
&mut self,
_ctx: &ConnectorContext,
_notifier: ConnectionLostNotifier,
) -> Result<bool> {
Ok(true)
}

/// create sink if we have a stdout or stderr stream
async fn create_sink(
&mut self,
sink_context: SinkContext,
builder: SinkManagerBuilder,
) -> Result<Option<SinkAddr>> {
let addr = match self.stream {
StdStream::Stdout => {
let sink: StdStreamSink<Stdout> = StdStreamSink {
stream: stdout(),
prefix: self.prefix.clone(),
raw: self.raw,
};
builder.spawn(sink, sink_context)?
}
StdStream::Stderr => {
let sink: StdStreamSink<Stderr> = StdStreamSink {
stream: stderr(),
prefix: self.prefix.clone(),
raw: self.raw,
};
builder.spawn(sink, sink_context)?
}
StdStream::Stdin => return Ok(None),
};
Ok(Some(addr))
}

async fn create_source(
&mut self,
source_context: SourceContext,
builder: SourceManagerBuilder,
) -> Result<Option<SourceAddr>> {
Ok(if let StdStream::Stdin = self.stream {
let source = StdStreamSource::new();
let addr = builder.spawn(source, source_context)?;
Some(addr)
} else {
None
})
}

async fn on_start(&mut self, _ctx: &ConnectorContext) -> Result<ConnectorState> {
Ok(ConnectorState::Running)
}

fn default_codec(&self) -> &str {
"json"
}
}
35 changes: 35 additions & 0 deletions src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ lazy_static! {
//ALLOW: We want this to panic, it only happens at startup time
.expect("Failed to initialize id for metrics connector")
};
pub(crate) static ref STDOUT_CONNECTOR: TremorUrl = {
TremorUrl::parse("/connector/system::stdout/system/in")
//ALLOW: We want this to panic, it only happens at startup time
.expect("Failed to initialize id for stdout connector")
};
pub(crate) static ref STDERR_CONNECTOR: TremorUrl = {
TremorUrl::parse("/connector/system::stderr/system/in")
//ALLOW: We want this to panic, it only happens at startup time
.expect("Failed to initialize id for stderr connector")
};
pub(crate) static ref STDIN_CONNECTOR: TremorUrl = {
TremorUrl::parse("/connector/system::stdin/system/out")
//ALLOW: We want this to panic, it only happens at startup time
.expect("Failed to initialize id for stderr connector")
};
pub(crate) static ref METRICS_PIPELINE: TremorUrl = {
TremorUrl::parse("/pipeline/system::metrics/system/in")
//ALLOW: We want this to panic, it only happens at startup time
Expand Down Expand Up @@ -988,6 +1003,26 @@ type: metrics
.publish_pipeline(&PASSTHROUGH_PIPELINE, true, artefact_passthrough)
.await?;

// Register stdout connector
let artefact: ConnectorArtefact = serde_yaml::from_str(
r#"
id: system::stdout
type: std_stream
config:
stream: stdout
"#,
)?;
self.repo
.publish_connector(&STDOUT_CONNECTOR, true, artefact)
.await?;
self.bind_connector(&STDOUT_CONNECTOR).await?;
self.reg
.find_connector(&STDOUT_CONNECTOR)
.await?
.ok_or_else(|| Error::from("Failed to initialize system::stdout connector"))?;

// FIXME: stderr and stdin connectors

// Register stdout offramp
let artefact: OfframpArtefact = serde_yaml::from_str(
r#"
Expand Down
7 changes: 7 additions & 0 deletions src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub fn log() {
info!("rd_kafka version: 0x{:08x}, {}", version_n, version_s);
}

/// Gets the librdkafka version string
#[must_use]
pub fn rdkafka() -> String {
let (_, version) = get_rdkafka_version();
version
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion tests/query_runtime_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ macro_rules! test_cases {
let mut results = Vec::new();
for (id, json) in in_json.into_iter().enumerate() {
let event = Event {
id: EventId::new(0, 0, (id as u64)),
id: EventId::from_id(0, 0, (id as u64)),
data: json.clone_static().into(),
ingest_ns: id as u64,
..Event::default()
Expand Down
2 changes: 0 additions & 2 deletions tests/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ macro_rules! test_cases {
path: vec!["snot".into()],
port: Some(23),
scheme: "snot".into(),
uid: 42

};
let context = EventContext::new(id as u64, Some(&uri));
let mut meta = Value::from(Object::default());
Expand Down
1 change: 0 additions & 1 deletion tremor-cli/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub(crate) fn setup() -> Result<TremorCliEnv> {

// Install runtime extensions from a single source of truth
tremor_runtime::functions::install(&mut fun)?;
tremor_connectors::connectors::functions::install(&mut fun);

Ok(TremorCliEnv {
module_path,
Expand Down
2 changes: 1 addition & 1 deletion tremor-cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ fn run_trickle_source(matches: &ArgMatches, src: String) -> Result<()> {
if let Err(e) = runnable.enqueue(
"in",
Event {
id: EventId::new(0, 0, *id),
id: EventId::from_id(0, 0, *id),
data: value.clone(),
ingest_ns: at,
..Event::default()
Expand Down
1 change: 0 additions & 1 deletion tremor-cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ pub(crate) async fn run_dun(matches: &ArgMatches) -> Result<()> {
.map(std::string::ToString::to_string);
// TODO: Allow configuring this for offramps and pipelines
let (world, handle) = World::start(64, storage_directory).await?;
tremor_connectors::register_builtins(&world).await?;

if let Some(config_files) = matches.values_of("artefacts") {
let mut yaml_files = Vec::with_capacity(16);
Expand Down

0 comments on commit 2885548

Please sign in to comment.