Skip to content

Commit

Permalink
feat: close producer when topic is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Aug 22, 2024
1 parent 790b7fd commit 88a6e8f
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ producer-file-io = ["fluvio-cli-common/file-records"]
[dependencies]
async-channel = { workspace = true }
async-trait = { workspace = true }
async-std = { workspace = true }
anyhow = { workspace = true }
bytesize = { workspace = true, features = ['serde'] }
clap = { workspace = true, features = ["std", "derive", "string", "help", "usage", "env", "error-context"] }
Expand Down
80 changes: 60 additions & 20 deletions crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ mod cmd {
#[cfg(feature = "producer-file-io")]
use std::path::PathBuf;

use async_std::io::stdin;
use async_trait::async_trait;
use fluvio_future::io::StreamExt;
use fluvio_sc_schema::message::MsgType;
use fluvio_sc_schema::topic::TopicSpec;
#[cfg(feature = "producer-file-io")]
use futures::future::join_all;
use clap::Parser;
use tokio::select;
use tracing::{error, warn};
use humantime::parse_duration;
use anyhow::Result;

use fluvio::{
Compression, Fluvio, FluvioError, TopicProducerPool, TopicProducerConfigBuilder, RecordKey,
ProduceOutput, DeliverySemantic, SmartModuleContextData, Isolation, SmartModuleInvocation,
Compression, DeliverySemantic, Fluvio, FluvioAdmin, FluvioError, Isolation, ProduceOutput,
RecordKey, SmartModuleContextData, SmartModuleInvocation, TopicProducerConfigBuilder,
TopicProducerPool,
};
use fluvio_extension_common::Terminal;
use fluvio_types::print_cli_ok;
Expand Down Expand Up @@ -243,16 +249,18 @@ mod cmd {
.await?,
);

let admin = fluvio.admin().await;

#[cfg(feature = "producer-file-io")]
if self.raw {
self.process_raw_file(&producer).await?;
} else {
self.produce_lines(producer.clone()).await?;
self.produce_lines(producer.clone(), &admin).await?;
};

#[cfg(not(feature = "producer-file-io"))]
{
self.produce_lines(producer.clone()).await?;
self.produce_lines(producer.clone(), &admin).await?;
}

producer.flush().await?;
Expand Down Expand Up @@ -315,7 +323,11 @@ mod cmd {
}
}

async fn produce_lines(&self, producer: Arc<TopicProducerPool>) -> Result<()> {
async fn produce_lines(
&self,
producer: Arc<TopicProducerPool>,
admin: &FluvioAdmin,
) -> Result<()> {
#[cfg(feature = "producer-file-io")]
if let Some(path) = &self.file {
let reader = BufReader::new(File::open(path)?);
Expand All @@ -340,7 +352,7 @@ mod cmd {
.collect::<Result<Vec<_>, _>>()?;
}
} else {
self.producer_stdin(&producer).await?
self.producer_stdin(&producer, admin).await?
}

#[cfg(not(feature = "producer-file-io"))]
Expand All @@ -349,27 +361,55 @@ mod cmd {
Ok(())
}

async fn producer_stdin(&self, producer: &Arc<TopicProducerPool>) -> Result<()> {
let mut lines = BufReader::new(std::io::stdin()).lines();
async fn producer_stdin(
&self,
producer: &Arc<TopicProducerPool>,
admin: &FluvioAdmin,
) -> Result<()> {
use async_std::io::prelude::*;
use async_std::io::BufReader;
let mut lines = BufReader::new(stdin()).lines();
let mut partition_stream = admin.watch::<TopicSpec>().await?;

if self.interactive_mode() {
eprint!("> ");
}

while let Some(Ok(line)) = lines.next() {
let produce_output = self.produce_line(producer, &line).await?;

if let Some(produce_output) = produce_output {
if self.delivery_semantic != DeliverySemantic::AtMostOnce {
// ensure it was properly sent
produce_output.wait().await?;
loop {
select! {
line = lines.next() => {
if let Some(Ok(line)) = line {
let produce_output = self.produce_line(producer, &line).await?;

if let Some(produce_output) = produce_output {
if self.delivery_semantic != DeliverySemantic::AtMostOnce {
// ensure it was properly sent
produce_output.wait().await?;
}
}

if self.interactive_mode() {
print_cli_ok!();
eprint!("> ");
}
} else {
// When stdin is closed, we break the loop
break;
}
}
stream = partition_stream.next() => {
if let Some(stream) = stream {
let stream = stream?;
for change in stream.inner().changes {
if change.header == MsgType::DELETE && change.content.name == self.topic {
return Err(CliError::TopicDeleted(self.topic.clone()).into());
}
}
}
}
}

if self.interactive_mode() {
print_cli_ok!();
eprint!("> ");
}
}

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ pub enum CliError {
SmartModuleConfigBuilder(#[from] fluvio_smartengine::SmartModuleConfigBuilderError),
#[error("Hub error: {0}")]
HubError(String),
#[error("Topic \"{0}\" was deleted")]
TopicDeleted(String),
}
20 changes: 20 additions & 0 deletions tests/cli/fluvio_smoke_tests/produce-error.bats
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,23 @@ teardown_file() {
run bash -c 'echo abcdefgh | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_2" --compression lz4'
assert_failure
}

# Delete topic should stop producer and return error
@test "Delete topic while producing" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi

run bash -c '/usr/bin/expect <<-EOF
spawn "$FLUVIO_BIN" produce "$TOPIC_NAME"
expect "> "
exec "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
expect "Topic \"$TOPIC_NAME\" was deleted"
exit
EOF'
assert_success
assert_output --partial "Topic \"$TOPIC_NAME\" was deleted"
}
4 changes: 2 additions & 2 deletions tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ setup_file() {
}

@test "Home status at remote 1 should show the home cluster connected" {
sleep 15
sleep 30
run timeout 15s "$FLUVIO_BIN" home status

assert_success
Expand Down Expand Up @@ -165,7 +165,7 @@ setup_file() {
}

@test "Home status at remote 2 should show the home cluster connected" {
sleep 15
sleep 30
run timeout 15s "$FLUVIO_BIN" home status

assert_success
Expand Down

0 comments on commit 88a6e8f

Please sign in to comment.