Skip to content

Commit

Permalink
Remove the Mutex from CloudWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
Qqwy committed Jul 28, 2023
1 parent 326e5ae commit cfe8e5b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 18 deletions.
18 changes: 7 additions & 11 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct CloudWriter {
// The Tokio runtime which the writer uses internally.
runtime: tokio::runtime::Runtime,
// Internal writer, constructed at creation
writer: std::sync::Mutex<Box<dyn AsyncWrite + Send + Unpin>>,
writer: Box<dyn AsyncWrite + Send + Unpin>,
}

impl CloudWriter {
Expand Down Expand Up @@ -198,11 +198,10 @@ impl CloudWriter {
path: &Path,
) -> object_store::Result<(
MultipartId,
std::sync::Mutex<Box<dyn AsyncWrite + Send + Unpin>>,
Box<dyn AsyncWrite + Send + Unpin>,
)> {
let (multipart_id, async_s3_writer) = object_store.put_multipart(path).await?;
let sync_s3_uploader = std::sync::Mutex::new(async_s3_writer);
Ok((multipart_id, sync_s3_uploader))
let (multipart_id, s3_writer) = object_store.put_multipart(path).await?;
Ok((multipart_id, s3_writer))
}

fn abort(&self) {
Expand All @@ -216,17 +215,15 @@ impl CloudWriter {

impl std::io::Write for CloudWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut writer = self.writer.lock().unwrap();
let res = self.runtime.block_on(writer.write(buf));
let res = self.runtime.block_on(self.writer.write(buf));
if res.is_err() {
self.abort();
}
res
}

fn flush(&mut self) -> std::io::Result<()> {
let mut writer = self.writer.lock().unwrap();
let res = self.runtime.block_on(writer.flush());
let res = self.runtime.block_on(self.writer.flush());
if res.is_err() {
self.abort();
}
Expand All @@ -236,8 +233,7 @@ impl std::io::Write for CloudWriter {

impl Drop for CloudWriter {
fn drop(&mut self) {
let mut writer = self.writer.lock().unwrap();
let _ = self.runtime.block_on(writer.shutdown());
let _ = self.runtime.block_on(self.writer.shutdown());
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/polars-pipe/src/executors/sinks/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl ParquetSink {
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send + Sync>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
Expand Down Expand Up @@ -110,7 +110,7 @@ impl ParquetCloudSink {
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send + Sync>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
Expand Down Expand Up @@ -141,7 +141,7 @@ impl IpcSink {
.with_compression(options.compression)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send + Sync>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
Expand All @@ -164,7 +164,7 @@ impl IpcSink {
#[cfg(any(feature = "parquet", feature = "ipc"))]
fn init_writer_thread(
receiver: Receiver<Option<DataChunk>>,
mut writer: Box<dyn SinkWriter + Send + Sync>,
mut writer: Box<dyn SinkWriter + Send>,
maintain_order: bool,
// this is used to determine when a batch of chunks should be written to disk
// all chunks per push should be collected to determine in which order they should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ pub fn can_convert_to_hash_agg(
}

/// # Returns:
/// - input_dtype: dtype that goes into the agg expression
/// - physical expr: physical expression that produces the input of the aggregation
/// - aggregation function: the aggregation function
/// - input_dtype: dtype that goes into the agg expression
/// - physical expr: physical expression that produces the input of the aggregation
/// - aggregation function: the aggregation function
pub(crate) fn convert_to_hash_agg<F>(
node: Node,
expr_arena: &Arena<AExpr>,
Expand Down

0 comments on commit cfe8e5b

Please sign in to comment.