From 88a6e8fdcd6cfce6df82dd0a016b96fd7e073cd0 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Wed, 21 Aug 2024 20:56:50 -0300 Subject: [PATCH] feat: close producer when topic is deleted --- Cargo.lock | 1 + crates/fluvio-cli/Cargo.toml | 1 + crates/fluvio-cli/src/client/produce/mod.rs | 80 ++++++++++++++----- crates/fluvio-cli/src/error.rs | 2 + .../cli/fluvio_smoke_tests/produce-error.bats | 20 +++++ .../e2e/fluvio-core.bats | 4 +- 6 files changed, 86 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 524ee78d55..b90867a00c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2440,6 +2440,7 @@ version = "0.0.0" dependencies = [ "anyhow", "async-channel 1.9.0", + "async-std", "async-trait", "bytesize", "clap", diff --git a/crates/fluvio-cli/Cargo.toml b/crates/fluvio-cli/Cargo.toml index 4b3865e20b..0ababbc700 100644 --- a/crates/fluvio-cli/Cargo.toml +++ b/crates/fluvio-cli/Cargo.toml @@ -39,6 +39,7 @@ producer-file-io = ["fluvio-cli-common/file-records"] [dependencies] async-channel = { workspace = true } async-trait = { workspace = true } +async-std = { workspace = true } anyhow = { workspace = true } bytesize = { workspace = true, features = ['serde'] } clap = { workspace = true, features = ["std", "derive", "string", "help", "usage", "env", "error-context"] } diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index f8c1ffb633..5afe4682dd 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -11,17 +11,23 @@ mod cmd { #[cfg(feature = "producer-file-io")] use std::path::PathBuf; + use async_std::io::stdin; use async_trait::async_trait; + use fluvio_future::io::StreamExt; + use fluvio_sc_schema::message::MsgType; + use fluvio_sc_schema::topic::TopicSpec; #[cfg(feature = "producer-file-io")] use futures::future::join_all; use clap::Parser; + use tokio::select; use tracing::{error, warn}; use humantime::parse_duration; use anyhow::Result; use fluvio::{ - Compression, Fluvio, FluvioError, TopicProducerPool, TopicProducerConfigBuilder, RecordKey, - ProduceOutput, DeliverySemantic, SmartModuleContextData, Isolation, SmartModuleInvocation, + Compression, DeliverySemantic, Fluvio, FluvioAdmin, FluvioError, Isolation, ProduceOutput, + RecordKey, SmartModuleContextData, SmartModuleInvocation, TopicProducerConfigBuilder, + TopicProducerPool, }; use fluvio_extension_common::Terminal; use fluvio_types::print_cli_ok; @@ -243,16 +249,18 @@ mod cmd { .await?, ); + let admin = fluvio.admin().await; + #[cfg(feature = "producer-file-io")] if self.raw { self.process_raw_file(&producer).await?; } else { - self.produce_lines(producer.clone()).await?; + self.produce_lines(producer.clone(), &admin).await?; }; #[cfg(not(feature = "producer-file-io"))] { - self.produce_lines(producer.clone()).await?; + self.produce_lines(producer.clone(), &admin).await?; } producer.flush().await?; @@ -315,7 +323,11 @@ mod cmd { } } - async fn produce_lines(&self, producer: Arc) -> Result<()> { + async fn produce_lines( + &self, + producer: Arc, + admin: &FluvioAdmin, + ) -> Result<()> { #[cfg(feature = "producer-file-io")] if let Some(path) = &self.file { let reader = BufReader::new(File::open(path)?); @@ -340,7 +352,7 @@ mod cmd { .collect::, _>>()?; } } else { - self.producer_stdin(&producer).await? + self.producer_stdin(&producer, admin).await? } #[cfg(not(feature = "producer-file-io"))] @@ -349,27 +361,55 @@ mod cmd { Ok(()) } - async fn producer_stdin(&self, producer: &Arc) -> Result<()> { - let mut lines = BufReader::new(std::io::stdin()).lines(); + async fn producer_stdin( + &self, + producer: &Arc, + admin: &FluvioAdmin, + ) -> Result<()> { + use async_std::io::prelude::*; + use async_std::io::BufReader; + let mut lines = BufReader::new(stdin()).lines(); + let mut partition_stream = admin.watch::().await?; + if self.interactive_mode() { eprint!("> "); } - while let Some(Ok(line)) = lines.next() { - let produce_output = self.produce_line(producer, &line).await?; - - if let Some(produce_output) = produce_output { - if self.delivery_semantic != DeliverySemantic::AtMostOnce { - // ensure it was properly sent - produce_output.wait().await?; + loop { + select! { + line = lines.next() => { + if let Some(Ok(line)) = line { + let produce_output = self.produce_line(producer, &line).await?; + + if let Some(produce_output) = produce_output { + if self.delivery_semantic != DeliverySemantic::AtMostOnce { + // ensure it was properly sent + produce_output.wait().await?; + } + } + + if self.interactive_mode() { + print_cli_ok!(); + eprint!("> "); + } + } else { + // When stdin is closed, we break the loop + break; + } + } + stream = partition_stream.next() => { + if let Some(stream) = stream { + let stream = stream?; + for change in stream.inner().changes { + if change.header == MsgType::DELETE && change.content.name == self.topic { + return Err(CliError::TopicDeleted(self.topic.clone()).into()); + } + } + } } - } - - if self.interactive_mode() { - print_cli_ok!(); - eprint!("> "); } } + Ok(()) } diff --git a/crates/fluvio-cli/src/error.rs b/crates/fluvio-cli/src/error.rs index 9bb0d54366..8dd130a563 100644 --- a/crates/fluvio-cli/src/error.rs +++ b/crates/fluvio-cli/src/error.rs @@ -67,4 +67,6 @@ pub enum CliError { SmartModuleConfigBuilder(#[from] fluvio_smartengine::SmartModuleConfigBuilderError), #[error("Hub error: {0}")] HubError(String), + #[error("Topic \"{0}\" was deleted")] + TopicDeleted(String), } diff --git a/tests/cli/fluvio_smoke_tests/produce-error.bats b/tests/cli/fluvio_smoke_tests/produce-error.bats index 3c5cdf02ec..f7f64bfa15 100644 --- a/tests/cli/fluvio_smoke_tests/produce-error.bats +++ b/tests/cli/fluvio_smoke_tests/produce-error.bats @@ -57,3 +57,23 @@ teardown_file() { run bash -c 'echo abcdefgh | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_2" --compression lz4' assert_failure } + +# Delete topic should stop producer and return error +@test "Delete topic while producing" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + + run bash -c '/usr/bin/expect <<-EOF + spawn "$FLUVIO_BIN" produce "$TOPIC_NAME" + expect "> " + exec "$FLUVIO_BIN" topic delete "$TOPIC_NAME" + expect "Topic \"$TOPIC_NAME\" was deleted" + exit + EOF' + assert_success + assert_output --partial "Topic \"$TOPIC_NAME\" was deleted" +} diff --git a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats index 0bb4ffc072..6839d8ee87 100644 --- a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats +++ b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats @@ -130,7 +130,7 @@ setup_file() { } @test "Home status at remote 1 should show the home cluster connected" { - sleep 15 + sleep 30 run timeout 15s "$FLUVIO_BIN" home status assert_success @@ -165,7 +165,7 @@ setup_file() { } @test "Home status at remote 2 should show the home cluster connected" { - sleep 15 + sleep 30 run timeout 15s "$FLUVIO_BIN" home status assert_success