Skip to content

Commit

Permalink
Merge pull request #246 from influxdata/crepererum/drop_pin_project_lite
Browse files Browse the repository at this point in the history
chore: drop `pin-project-lite`
  • Loading branch information
alamb authored Aug 6, 2024
2 parents fad1a83 + e8c97af commit a331d09
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 126 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ futures = "0.3"
integer-encoding = "4"
lz4 = { version = "1.23", optional = true }
parking_lot = "0.12"
pin-project-lite = "0.2"
rand = "0.8"
rustls = { version = "0.23", optional = true, default-features = false, features = ["logging", "ring", "std", "tls12"] }
snap = { version = "1", optional = true }
Expand Down
21 changes: 8 additions & 13 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use chrono::{TimeZone, Utc};
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
};
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
use parking_lot::Once;
use pin_project_lite::pin_project;
use rdkafka::{
consumer::{Consumer, StreamConsumer as RdStreamConsumer},
producer::{FutureProducer, FutureRecord},
Expand Down Expand Up @@ -256,17 +255,14 @@ where
fn time_it(self) -> Self::TimeItFut {
TimeIt {
t_start: Instant::now(),
inner: self,
inner: Box::pin(self),
}
}
}

pin_project! {
struct TimeIt<F> {
t_start: Instant,
#[pin]
inner: F,
}
struct TimeIt<F> {
t_start: Instant,
inner: Pin<Box<F>>,
}

impl<F> Future for TimeIt<F>
Expand All @@ -275,10 +271,9 @@ where
{
type Output = Duration;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(_) => Poll::Ready(this.t_start.elapsed()),
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.inner.poll_unpin(cx) {
Poll::Ready(_) => Poll::Ready(self.t_start.elapsed()),
Poll::Pending => Poll::Pending,
}
}
Expand Down
59 changes: 31 additions & 28 deletions fuzz/fuzz_targets/protocol_reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#![no_main]
use std::{collections::HashMap, io::Cursor, sync::Arc, time::Duration};
use std::{
collections::HashMap,
io::Cursor,
ops::DerefMut,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use libfuzzer_sys::fuzz_target;
use pin_project_lite::pin_project;
use rskafka::{
build_info::DEFAULT_CLIENT_ID,
messenger::Messenger,
Expand Down Expand Up @@ -151,16 +158,12 @@ where
})
}

pin_project! {
/// One-way mock transport with limited data.
///
/// Can only be read. Writes go to `/dev/null`.
struct MockTransport {
#[pin]
data: Cursor<Vec<u8>>,
#[pin]
sink: Sink,
}
/// One-way mock transport with limited data.
///
/// Can only be read. Writes go to `/dev/null`.
struct MockTransport {
data: Cursor<Vec<u8>>,
sink: Sink,
}

impl MockTransport {
Expand All @@ -174,34 +177,34 @@ impl MockTransport {

impl AsyncWrite for MockTransport {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().sink.poll_write(cx, buf)
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.deref_mut().sink).poll_write(cx, buf)
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().sink.poll_flush(cx)
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.deref_mut().sink).poll_flush(cx)
}

fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().sink.poll_shutdown(cx)
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.deref_mut().sink).poll_shutdown(cx)
}
}

impl AsyncRead for MockTransport {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().data.poll_read(cx, buf)
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.deref_mut().data).poll_read(cx, buf)
}
}
82 changes: 39 additions & 43 deletions src/client/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use std::time::Duration;

use futures::future::{BoxFuture, Fuse, FusedFuture, FutureExt};
use futures::Stream;
use pin_project_lite::pin_project;
use tracing::{debug, trace, warn};

use crate::{
Expand Down Expand Up @@ -196,61 +195,58 @@ impl FetchClient for PartitionClient {
}
}

pin_project! {
/// Stream consuming data from start offset.
///
/// # Error Handling
/// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error
/// once and will terminate afterwards.
pub struct StreamConsumer {
client: Arc<dyn FetchClient>,
/// Stream consuming data from start offset.
///
/// # Error Handling
/// If an error is returned by [`fetch_records`](`PartitionClient::fetch_records`) then the stream will emit this error
/// once and will terminate afterwards.
pub struct StreamConsumer {
client: Arc<dyn FetchClient>,

min_batch_size: i32,
min_batch_size: i32,

max_batch_size: i32,
max_batch_size: i32,

max_wait_ms: i32,
max_wait_ms: i32,

start_offset: StartOffset,
start_offset: StartOffset,

next_offset: Option<i64>,
next_offset: Option<i64>,

next_backoff: Option<Duration>,
next_backoff: Option<Duration>,

terminated: bool,
terminated: bool,

last_high_watermark: i64,
last_high_watermark: i64,

buffer: VecDeque<RecordAndOffset>,
buffer: VecDeque<RecordAndOffset>,

fetch_fut: Fuse<BoxFuture<'static, FetchResult>>,
}
fetch_fut: Fuse<BoxFuture<'static, FetchResult>>,
}

impl Stream for StreamConsumer {
type Item = Result<(RecordAndOffset, i64)>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if *this.terminated {
if self.terminated {
return Poll::Ready(None);
}
if let Some(x) = this.buffer.pop_front() {
return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
if let Some(x) = self.buffer.pop_front() {
return Poll::Ready(Some(Ok((x, self.last_high_watermark))));
}

if this.fetch_fut.is_terminated() {
let next_offset = *this.next_offset;
let start_offset = *this.start_offset;
let bytes = (*this.min_batch_size)..(*this.max_batch_size);
let max_wait_ms = *this.max_wait_ms;
let next_backoff = std::mem::take(this.next_backoff);
let client = Arc::clone(this.client);
if self.fetch_fut.is_terminated() {
let next_offset = self.next_offset;
let start_offset = self.start_offset;
let bytes = (self.min_batch_size)..(self.max_batch_size);
let max_wait_ms = self.max_wait_ms;
let next_backoff = std::mem::take(&mut self.next_backoff);
let client = Arc::clone(&self.client);

trace!(?start_offset, ?next_offset, "Fetching records at offset");

*this.fetch_fut = FutureExt::fuse(Box::pin(async move {
self.fetch_fut = FutureExt::fuse(Box::pin(async move {
if let Some(backoff) = next_backoff {
tokio::time::sleep(backoff).await;
}
Expand Down Expand Up @@ -282,9 +278,9 @@ impl Stream for StreamConsumer {
}));
}

let data: FetchResult = futures::ready!(this.fetch_fut.poll_unpin(cx));
let data: FetchResult = futures::ready!(self.fetch_fut.poll_unpin(cx));

match (data, *this.start_offset) {
match (data, self.start_offset) {
(Ok(inner), _) => {
let FetchResultOk {
mut records_and_offsets,
Expand All @@ -300,14 +296,14 @@ impl Stream for StreamConsumer {
// Remember used offset (might be overwritten if there was any data) so we don't refetch the
// earliest / latest offset for every try. Also fetching the latest offset might be racy otherwise,
// since we'll never be in a position where the latest one can actually be fetched.
*this.next_offset = Some(used_offset);
self.next_offset = Some(used_offset);

// Sort records by offset in case they aren't in order
records_and_offsets.sort_by_key(|x| x.offset);
*this.last_high_watermark = watermark;
self.last_high_watermark = watermark;
if let Some(x) = records_and_offsets.last() {
*this.next_offset = Some(x.offset + 1);
this.buffer.extend(records_and_offsets)
self.next_offset = Some(x.offset + 1);
self.buffer.extend(records_and_offsets)
}
continue;
}
Expand All @@ -320,25 +316,25 @@ impl Stream for StreamConsumer {
StartOffset::Earliest | StartOffset::Latest,
) => {
// wipe offset and try again
*this.next_offset = None;
self.next_offset = None;

// This will only happen if retention / deletions happen after we've asked for the earliest/latest
// offset and our "fetch" request. This should be a rather rare event, but if something is horrible
// wrong in our cluster (e.g. some actor is spamming "delete" requests) then let's at least backoff
// a bit.
let backoff_secs = 1;
warn!(
start_offset=?this.start_offset,
start_offset=?self.start_offset,
backoff_secs,
"Records are gone between ListOffsets and Fetch, backoff a bit",
);
*this.next_backoff = Some(Duration::from_secs(backoff_secs));
self.next_backoff = Some(Duration::from_secs(backoff_secs));

continue;
}
// if we have an offset, terminate the stream
(Err(e), _) => {
*this.terminated = true;
self.terminated = true;

// report error once
return Poll::Ready(Some(Err(e)));
Expand Down
Loading

0 comments on commit a331d09

Please sign in to comment.