-
Notifications
You must be signed in to change notification settings - Fork 486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add fluvio producer callback #4340
Conversation
21775b3
to
a283846
Compare
530de55
to
c182387
Compare
self.stat.start(); | ||
let time = std::time::Instant::now(); | ||
let send_out = self | ||
self.stat.start().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start is setting the first instant to calculate the final bandwidth at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed!
.fluvio_producer | ||
.send(record.key, record.data.clone()) | ||
.await?; | ||
|
||
self.stat.send_out((send_out, time)); | ||
self.stat.add_record(record.data.len() as u64).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer need this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed!
} | ||
// send end | ||
let hist = hist.lock().await; | ||
let start_time = start_time.lock().await.expect("start time"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start time should be tracked by callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the first start time, I can rename it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using first start time from callback now
// send end | ||
let hist = hist.lock().await; | ||
let start_time = start_time.lock().await.expect("start time"); | ||
let elapsed = start_time.elapsed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be come from callback
crates/fluvio/src/producer/config.rs
Outdated
@@ -118,6 +120,9 @@ pub struct TopicProducerConfig { | |||
|
|||
#[builder(default)] | |||
pub(crate) smartmodules: Vec<SmartModuleInvocation>, | |||
|
|||
#[builder(setter(into, strip_option), default)] | |||
pub(crate) callback: Option<SharedProducerCallback<ProduceCompletionEvent>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why make generic? don't think there is need to make generic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
It was to make it easy to handle lifetimes in my first implementation. But it's not needed for actual implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed them
created_at: batch_metadata.created_at, | ||
metadata: batch_metadata.clone(), | ||
}; | ||
callback.finished(event).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unwrap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed!
} | ||
|
||
pub(crate) fn set_current_time(&mut self) { | ||
self.start_time = Instant::now(); | ||
pub(crate) fn add_record(&mut self, bytes: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be invoked by callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invoked by callback now
@@ -318,7 +332,8 @@ type ProduceResponseFuture = Shared<BoxFuture<'static, Arc<Result<ProduceRespons | |||
|
|||
/// A Future that resolves to pair `base_offset` and `error_code`, which effectively come from | |||
/// [`PartitionProduceResponse`]. | |||
pub(crate) struct ProducePartitionResponseFuture { | |||
#[derive(Debug, Clone)] | |||
pub struct ProducePartitionResponseFuture { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is still need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I reverted it.
created_at: batch_metadata.created_at, | ||
metadata: batch_metadata.clone(), | ||
}; | ||
callback.finished(event).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be trigger when flush happen not wait callback is waiting. this will block sender
crates/fluvio/src/producer/record.rs
Outdated
@@ -34,7 +35,7 @@ impl RecordMetadata { | |||
} | |||
|
|||
/// Possible states of a batch in the accumulator | |||
pub(crate) enum BatchMetadataState { | |||
pub enum BatchMetadataState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this need to be make it public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted it.
crates/fluvio/src/producer/record.rs
Outdated
} | ||
} | ||
|
||
/// Wait for the base offset of the batch. This is the offset of the first | ||
/// record in the batch and it is known once the batch is sent to the server. | ||
pub(crate) async fn base_offset(&self) -> Result<Offset> { | ||
pub async fn base_offset(&self) -> Result<Offset> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should not be change into public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted it.
eb0d036
to
c53c036
Compare
c53c036
to
145d6aa
Compare
crates/fluvio/src/producer/record.rs
Outdated
@@ -43,14 +44,16 @@ pub(crate) enum BatchMetadataState { | |||
Failed(ProducerError), | |||
} | |||
|
|||
pub(crate) struct BatchMetadata { | |||
state: RwLock<BatchMetadataState>, | |||
pub struct BatchMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is still need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, revert it!
e8abcb5
to
2f0836b
Compare
let elapsed = created_at.elapsed(); | ||
let event = ProduceCompletionBatchEvent { | ||
created_at, | ||
topic: self.replica.topic.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is expensive (cloning string)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're not using it, removed!
2f0836b
to
0e7380c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nice job
This PR adds a callback option for the producer.
Also prints total bytes and total records per sec at the end: