diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index 6037216b2a0d9..09dc5d3eeaa8d 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -148,7 +148,7 @@ pub struct CloudWriter { // The Tokio runtime which the writer uses internally. runtime: tokio::runtime::Runtime, // Internal writer, constructed at creation - writer: std::sync::Mutex>, + writer: Box, } impl CloudWriter { @@ -198,11 +198,10 @@ impl CloudWriter { path: &Path, ) -> object_store::Result<( MultipartId, - std::sync::Mutex>, + Box, )> { - let (multipart_id, async_s3_writer) = object_store.put_multipart(path).await?; - let sync_s3_uploader = std::sync::Mutex::new(async_s3_writer); - Ok((multipart_id, sync_s3_uploader)) + let (multipart_id, s3_writer) = object_store.put_multipart(path).await?; + Ok((multipart_id, s3_writer)) } fn abort(&self) { @@ -216,8 +215,7 @@ impl CloudWriter { impl std::io::Write for CloudWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut writer = self.writer.lock().unwrap(); - let res = self.runtime.block_on(writer.write(buf)); + let res = self.runtime.block_on(self.writer.write(buf)); if res.is_err() { self.abort(); } @@ -225,8 +223,7 @@ impl std::io::Write for CloudWriter { } fn flush(&mut self) -> std::io::Result<()> { - let mut writer = self.writer.lock().unwrap(); - let res = self.runtime.block_on(writer.flush()); + let res = self.runtime.block_on(self.writer.flush()); if res.is_err() { self.abort(); } @@ -236,8 +233,7 @@ impl std::io::Write for CloudWriter { impl Drop for CloudWriter { fn drop(&mut self) { - let mut writer = self.writer.lock().unwrap(); - let _ = self.runtime.block_on(writer.shutdown()); + let _ = self.runtime.block_on(self.writer.shutdown()); } } diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 1114bc5fbf8cf..ce1326379141e 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -110,7 +110,7 @@ impl ParquetCloudSink { .set_parallel(false) .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer) as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; @@ -164,7 +164,7 @@ impl IpcSink { #[cfg(any(feature = "parquet", feature = "ipc"))] fn init_writer_thread( receiver: Receiver>, - mut writer: Box, + mut writer: Box, maintain_order: bool, // this is used to determine when a batch of chunks should be written to disk // all chunks per push should be collected to determine in which order they should diff --git a/crates/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs index eeeec8f148798..0acf4d434c66f 100644 --- a/crates/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs @@ -107,9 +107,9 @@ pub fn can_convert_to_hash_agg( } /// # Returns: -/// - input_dtype: dtype that goes into the agg expression -/// - physical expr: physical expression that produces the input of the aggregation -/// - aggregation function: the aggregation function +/// - input_dtype: dtype that goes into the agg expression +/// - physical expr: physical expression that produces the input of the aggregation +/// - aggregation function: the aggregation function pub(crate) fn convert_to_hash_agg( node: Node, expr_arena: &Arena,