From d3026ace2187993ca3eb0af3fd67ae650a5a4853 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 7 Nov 2024 13:28:24 -0800 Subject: [PATCH] Don't overwrite udfs --- crates/arroyo-operator/src/operator.rs | 29 ++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/crates/arroyo-operator/src/operator.rs b/crates/arroyo-operator/src/operator.rs index 3cae5734a..de48ffc40 100644 --- a/crates/arroyo-operator/src/operator.rs +++ b/crates/arroyo-operator/src/operator.rs @@ -2,7 +2,7 @@ use crate::context::{ArrowContext, BatchReceiver}; use crate::inq_reader::InQReader; use crate::udfs::{ArroyoUdaf, UdafArg}; use crate::{CheckpointCounter, ControlOutcome, SourceFinishType}; -use anyhow::anyhow; +use anyhow::{anyhow, bail}; use arrow::array::RecordBatch; use arrow::datatypes::DataType; use arrow::datatypes::Schema; @@ -31,10 +31,13 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::future::Future; +use std::io::ErrorKind; use std::path::Path; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use tokio::fs::OpenOptions; +use tokio::io::AsyncWriteExt; use tokio::sync::Barrier; use tokio_stream::StreamExt; use tracing::{debug, error, info, trace, warn, Instrument}; @@ -638,7 +641,6 @@ impl Registry { ) }); - // write the dylib to a local file let local_udfs_dir = "/tmp/arroyo/local_udfs"; tokio::fs::create_dir_all(local_udfs_dir) .await @@ -649,9 +651,28 @@ impl Registry { .ok_or_else(|| anyhow!("Invalid dylib path: {}", config.dylib_path))?; let local_dylib_path = Path::new(local_udfs_dir).join(dylib_file_name); - tokio::fs::write(&local_dylib_path, udf) + // write the dylib to a local file if it's not already present + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&local_dylib_path) .await - .map_err(|e| anyhow!("unable to write dylib to file: {:?}", e))?; + { + Ok(mut file) => { + file.write_all(&udf).await?; + file.sync_all().await?; + } + Err(e) if e.kind() == ErrorKind::AlreadyExists => { + // nothing to do, UDF already written + } + Err(e) => { + bail!( + "Failed to write UDF dylib to {}: {}", + local_dylib_path.to_string_lossy(), + e + ); + } + }; let interface = if config.is_async { UdfInterface::Async(Arc::new(ContainerOrLocal::Container(unsafe {