From cae7aadc932ed38948f6a8377755a495291a6a32 Mon Sep 17 00:00:00 2001
From: Jackson Newhouse <jacksonrnewhouse@gmail.com>
Date: Fri, 27 Oct 2023 16:03:55 -0700
Subject: [PATCH] sql: Implement Delta Lake Sink arroyo-state: added additional
 message passing data to commit loop, so subtask 0 is responsible for
 finishing files. arroyo-worker: introduced CommitStrategy, an enum of
 PerOperator and PerSubtask to TwoPhaseCommitter arroyo-connectors: Refactored
 filesystem connector so that it can be reused by Delta connector.

---
 Cargo.lock                                    | 221 ++++++++++++++++--
 arroyo-connectors/src/filesystem.rs           | 176 +++++++-------
 arroyo-connectors/src/lib.rs                  |   2 +
 arroyo-controller/src/job_controller/mod.rs   |  23 +-
 arroyo-controller/src/states/scheduling.rs    |  19 +-
 arroyo-macro/src/lib.rs                       |   6 +-
 arroyo-rpc/proto/rpc.proto                    |  22 ++
 arroyo-rpc/src/lib.rs                         |  14 +-
 arroyo-state/src/checkpoint_state.rs          |  39 +++-
 arroyo-state/src/committing_state.rs          |  48 +++-
 arroyo-state/src/lib.rs                       |   8 +
 arroyo-state/src/parquet.rs                   |  22 ++
 arroyo-storage/src/lib.rs                     |  55 +++--
 arroyo-worker/Cargo.toml                      |   1 +
 .../src/connectors/filesystem/delta.rs        | 196 ++++++++++++++++
 .../src/connectors/filesystem/mod.rs          | 187 ++++++++++++---
 arroyo-worker/src/connectors/impulse.rs       |   2 +-
 .../src/connectors/kafka/sink/mod.rs          |  11 +-
 .../src/connectors/kafka/source/mod.rs        |   2 +-
 .../src/connectors/kafka/source/test.rs       |   1 +
 .../src/connectors/kinesis/source/mod.rs      |   3 +-
 arroyo-worker/src/connectors/polling_http.rs  |   2 +-
 arroyo-worker/src/connectors/sse.rs           |   2 +-
 .../src/connectors/two_phase_committer.rs     |  76 +++++-
 arroyo-worker/src/connectors/websocket.rs     |   2 +-
 arroyo-worker/src/formats.rs                  |   4 +-
 arroyo-worker/src/lib.rs                      |  58 ++++-
 connector-schemas/filesystem/table.json       |   8 +
 28 files changed, 1006 insertions(+), 204 deletions(-)
 create mode 100644 arroyo-worker/src/connectors/filesystem/delta.rs

diff --git a/Cargo.lock b/Cargo.lock
index 89bde32b2..684ff25b8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -189,7 +189,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
 [[package]]
 name = "arrow"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#0209c94ea4e3d924936e2127f89a885805aae1b4"
 dependencies = [
  "ahash 0.8.3",
  "arrow-arith",
@@ -210,7 +210,8 @@ dependencies = [
 [[package]]
 name = "arrow-arith"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "895263144bd4a69751cbe6a34a53f26626e19770b313a9fa792c415cd0e78f11"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -224,7 +225,7 @@ dependencies = [
 [[package]]
 name = "arrow-array"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#0209c94ea4e3d924936e2127f89a885805aae1b4"
 dependencies = [
  "ahash 0.8.3",
  "arrow-buffer",
@@ -240,7 +241,7 @@ dependencies = [
 [[package]]
 name = "arrow-buffer"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#0209c94ea4e3d924936e2127f89a885805aae1b4"
 dependencies = [
  "bytes",
  "half",
@@ -250,7 +251,8 @@ dependencies = [
 [[package]]
 name = "arrow-cast"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35e8b9990733a9b635f656efda3c9b8308c7a19695c9ec2c7046dd154f9b144b"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -267,7 +269,8 @@ dependencies = [
 [[package]]
 name = "arrow-csv"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "646fbb4e11dd0afb8083e883f53117713b8caadb4413b3c9e63e3f535da3683c"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -285,7 +288,8 @@ dependencies = [
 [[package]]
 name = "arrow-data"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da900f31ff01a0a84da0572209be72b2b6f980f3ea58803635de47913191c188"
 dependencies = [
  "arrow-buffer",
  "arrow-schema",
@@ -296,7 +300,8 @@ dependencies = [
 [[package]]
 name = "arrow-ipc"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2707a8d7ee2d345d045283ece3ae43416175873483e5d96319c929da542a0b1f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -309,7 +314,8 @@ dependencies = [
 [[package]]
 name = "arrow-json"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5d1b91a63c356d14eedc778b76d66a88f35ac8498426bb0799a769a49a74a8b4"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -328,7 +334,8 @@ dependencies = [
 [[package]]
 name = "arrow-ord"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "584325c91293abbca7aaaabf8da9fe303245d641f5f4a18a6058dc68009c7ebf"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -342,7 +349,8 @@ dependencies = [
 [[package]]
 name = "arrow-row"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e32afc1329f7b372463b21c6ca502b07cf237e1ed420d87706c1770bb0ebd38"
 dependencies = [
  "ahash 0.8.3",
  "arrow-array",
@@ -356,7 +364,7 @@ dependencies = [
 [[package]]
 name = "arrow-schema"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#0209c94ea4e3d924936e2127f89a885805aae1b4"
 dependencies = [
  "serde",
 ]
@@ -364,7 +372,8 @@ dependencies = [
 [[package]]
 name = "arrow-select"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73b3ca55356d1eae07cf48808d8c462cea674393ae6ad1e0b120f40b422eb2b4"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -376,7 +385,8 @@ dependencies = [
 [[package]]
 name = "arrow-string"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af1433ce02590cae68da0a18ed3a3ed868ffac2c6f24c533ddd2067f7ee04b4a"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -785,7 +795,7 @@ dependencies = [
  "bytes",
  "object_store",
  "regex",
- "rusoto_core",
+ "rusoto_core 0.48.0",
  "thiserror",
  "tokio",
  "webpki 0.22.4",
@@ -822,6 +832,7 @@ dependencies = [
  "bincode 2.0.0-rc.3",
  "bytes",
  "chrono",
+ "deltalake",
  "eventsource-client",
  "fluvio",
  "fluvio-future",
@@ -845,7 +856,7 @@ dependencies = [
  "regex",
  "regress",
  "reqwest",
- "rusoto_core",
+ "rusoto_core 0.48.0",
  "rusoto_s3",
  "serde",
  "serde_json",
@@ -2639,6 +2650,53 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "deltalake"
+version = "0.16.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72f889be5851e2633239a5329f57eac565ef4fc37b5d283d1532ca1aaa8d74c9"
+dependencies = [
+ "arrow",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-cast",
+ "arrow-ord",
+ "arrow-row",
+ "arrow-schema",
+ "arrow-select",
+ "async-trait",
+ "bytes",
+ "cfg-if",
+ "chrono",
+ "dynamodb_lock",
+ "errno",
+ "futures",
+ "itertools 0.11.0",
+ "lazy_static",
+ "libc",
+ "log",
+ "num-bigint",
+ "num-traits",
+ "num_cpus",
+ "object_store",
+ "once_cell",
+ "parking_lot 0.12.1",
+ "parquet",
+ "percent-encoding",
+ "rand",
+ "regex",
+ "rusoto_core 0.47.0",
+ "rusoto_credential 0.47.0",
+ "rusoto_dynamodb",
+ "rusoto_sts",
+ "serde",
+ "serde_json",
+ "thiserror",
+ "tokio",
+ "url",
+ "uuid",
+]
+
 [[package]]
 name = "der"
 version = "0.6.1"
@@ -2829,6 +2887,22 @@ version = "1.0.13"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555"
 
+[[package]]
+name = "dynamodb_lock"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fff628406c318f098017c78e203b5b6466e0610fc48be865ebb9ae0eb229b4c8"
+dependencies = [
+ "async-trait",
+ "log",
+ "maplit",
+ "rusoto_core 0.47.0",
+ "rusoto_dynamodb",
+ "thiserror",
+ "tokio",
+ "uuid",
+]
+
 [[package]]
 name = "ecdsa"
 version = "0.16.8"
@@ -4663,6 +4737,12 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "maplit"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
+
 [[package]]
 name = "matchers"
 version = "0.1.0"
@@ -5330,7 +5410,7 @@ dependencies = [
 [[package]]
 name = "parquet"
 version = "46.0.0"
-source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#30a13a193d24c5ea4c954f2092e81ee80a7af134"
+source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=46.0.0/parquet_bytes#0209c94ea4e3d924936e2127f89a885805aae1b4"
 dependencies = [
  "ahash 0.8.3",
  "arrow-array",
@@ -6286,6 +6366,31 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "rusoto_core"
+version = "0.47.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b4f000e8934c1b4f70adde180056812e7ea6b1a247952db8ee98c94cd3116cc"
+dependencies = [
+ "async-trait",
+ "base64 0.13.1",
+ "bytes",
+ "crc32fast",
+ "futures",
+ "http",
+ "hyper",
+ "hyper-rustls 0.22.1",
+ "lazy_static",
+ "log",
+ "rusoto_credential 0.47.0",
+ "rusoto_signature 0.47.0",
+ "rustc_version",
+ "serde",
+ "serde_json",
+ "tokio",
+ "xml-rs",
+]
+
 [[package]]
 name = "rusoto_core"
 version = "0.48.0"
@@ -6302,8 +6407,8 @@ dependencies = [
  "hyper-tls",
  "lazy_static",
  "log",
- "rusoto_credential",
- "rusoto_signature",
+ "rusoto_credential 0.48.0",
+ "rusoto_signature 0.48.0",
  "rustc_version",
  "serde",
  "serde_json",
@@ -6311,6 +6416,24 @@ dependencies = [
  "xml-rs",
 ]
 
+[[package]]
+name = "rusoto_credential"
+version = "0.47.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6a46b67db7bb66f5541e44db22b0a02fed59c9603e146db3a9e633272d3bac2f"
+dependencies = [
+ "async-trait",
+ "chrono",
+ "dirs-next",
+ "futures",
+ "hyper",
+ "serde",
+ "serde_json",
+ "shlex",
+ "tokio",
+ "zeroize",
+]
+
 [[package]]
 name = "rusoto_credential"
 version = "0.48.0"
@@ -6329,6 +6452,20 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "rusoto_dynamodb"
+version = "0.47.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7935e1f9ca57c4ee92a4d823dcd698eb8c992f7e84ca21976ae72cd2b03016e7"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures",
+ "rusoto_core 0.47.0",
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "rusoto_s3"
 version = "0.48.0"
@@ -6338,10 +6475,36 @@ dependencies = [
  "async-trait",
  "bytes",
  "futures",
- "rusoto_core",
+ "rusoto_core 0.48.0",
  "xml-rs",
 ]
 
+[[package]]
+name = "rusoto_signature"
+version = "0.47.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6264e93384b90a747758bcc82079711eacf2e755c3a8b5091687b5349d870bcc"
+dependencies = [
+ "base64 0.13.1",
+ "bytes",
+ "chrono",
+ "digest 0.9.0",
+ "futures",
+ "hex",
+ "hmac 0.11.0",
+ "http",
+ "hyper",
+ "log",
+ "md-5 0.9.1",
+ "percent-encoding",
+ "pin-project-lite",
+ "rusoto_credential 0.47.0",
+ "rustc_version",
+ "serde",
+ "sha2 0.9.9",
+ "tokio",
+]
+
 [[package]]
 name = "rusoto_signature"
 version = "0.48.0"
@@ -6361,13 +6524,28 @@ dependencies = [
  "md-5 0.9.1",
  "percent-encoding",
  "pin-project-lite",
- "rusoto_credential",
+ "rusoto_credential 0.48.0",
  "rustc_version",
  "serde",
  "sha2 0.9.9",
  "tokio",
 ]
 
+[[package]]
+name = "rusoto_sts"
+version = "0.47.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e7edd42473ac006fd54105f619e480b0a94136e7f53cf3fb73541363678fd92"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "chrono",
+ "futures",
+ "rusoto_core 0.47.0",
+ "serde_urlencoded",
+ "xml-rs",
+]
+
 [[package]]
 name = "rust-embed"
 version = "6.8.1"
@@ -8185,6 +8363,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
 dependencies = [
  "getrandom",
+ "serde",
 ]
 
 [[package]]
diff --git a/arroyo-connectors/src/filesystem.rs b/arroyo-connectors/src/filesystem.rs
index 0563554ad..befc71f77 100644
--- a/arroyo-connectors/src/filesystem.rs
+++ b/arroyo-connectors/src/filesystem.rs
@@ -77,6 +77,18 @@ impl Connector for FileSystemConnector {
         table: Self::TableT,
         schema: Option<&ConnectionSchema>,
     ) -> anyhow::Result<crate::Connection> {
+        // confirm commit style is Direct
+        if let Some(CommitStyle::Direct) = table
+            .file_settings
+            .as_ref()
+            .ok_or_else(|| anyhow!("no file_settings"))?
+            .commit_style
+        {
+            // ok
+        } else {
+            bail!("commit_style must be Direct");
+        }
+
         let backend_config = BackendConfig::parse_url(&table.write_target.path, true)?;
         let is_local = match &backend_config {
             BackendConfig::Local { .. } => true,
@@ -137,88 +149,92 @@ impl Connector for FileSystemConnector {
         opts: &mut std::collections::HashMap<String, String>,
         schema: Option<&ConnectionSchema>,
     ) -> anyhow::Result<crate::Connection> {
-        let storage_options: std::collections::HashMap<String, String> = opts
-            .iter()
-            .filter(|(k, _)| k.starts_with("storage."))
-            .map(|(k, v)| (k.trim_start_matches("storage.").to_string(), v.to_string()))
-            .collect();
-        opts.retain(|k, _| !k.starts_with("storage."));
-
-        let storage_url = pull_opt("path", opts)?;
-        BackendConfig::parse_url(&storage_url, true)?;
-
-        let inactivity_rollover_seconds = pull_option_to_i64("inactivity_rollover_seconds", opts)?;
-        let max_parts = pull_option_to_i64("max_parts", opts)?;
-        let rollover_seconds = pull_option_to_i64("rollover_seconds", opts)?;
-        let target_file_size = pull_option_to_i64("target_file_size", opts)?;
-        let target_part_size = pull_option_to_i64("target_part_size", opts)?;
-
-        let partition_fields: Vec<_> = opts
-            .remove("partition_fields")
-            .map(|fields| fields.split(',').map(|f| f.to_string()).collect())
-            .unwrap_or_default();
-
-        let time_partition_pattern = opts.remove("time_partition_pattern");
-
-        let partitioning = if time_partition_pattern.is_some() || !partition_fields.is_empty() {
-            Some(Partitioning {
-                time_partition_pattern,
-                partition_fields,
-            })
-        } else {
-            None
-        };
+        let table = file_system_table_from_options(opts, schema, CommitStyle::Direct)?;
 
-        let file_settings = Some(FileSettings {
-            inactivity_rollover_seconds,
-            max_parts,
-            rollover_seconds,
-            target_file_size,
-            target_part_size,
-            partitioning,
-        });
+        self.from_config(None, name, EmptyConfig {}, table, schema)
+    }
+}
 
-        let format_settings = match schema
-            .ok_or(anyhow!("require schema"))?
-            .format
-            .as_ref()
-            .ok_or(anyhow!(
-                "filesystem sink requires a format, such as json or parquet"
-            ))? {
-            Format::Parquet(..) => {
-                let compression = opts
-                    .remove("parquet_compression")
-                    .map(|value| {
-                        Compression::try_from(&value).map_err(|_err| {
-                            anyhow!("{} is not a valid parquet_compression argument", value)
-                        })
+pub fn file_system_table_from_options(
+    opts: &mut std::collections::HashMap<String, String>,
+    schema: Option<&ConnectionSchema>,
+    commit_style: CommitStyle,
+) -> Result<FileSystemTable> {
+    let storage_options: std::collections::HashMap<String, String> = opts
+        .iter()
+        .filter(|(k, _)| k.starts_with("storage."))
+        .map(|(k, v)| (k.trim_start_matches("storage.").to_string(), v.to_string()))
+        .collect();
+    opts.retain(|k, _| !k.starts_with("storage."));
+
+    let storage_url = pull_opt("path", opts)?;
+    BackendConfig::parse_url(&storage_url, true)?;
+
+    let inactivity_rollover_seconds = pull_option_to_i64("inactivity_rollover_seconds", opts)?;
+    let max_parts = pull_option_to_i64("max_parts", opts)?;
+    let rollover_seconds = pull_option_to_i64("rollover_seconds", opts)?;
+    let target_file_size = pull_option_to_i64("target_file_size", opts)?;
+    let target_part_size = pull_option_to_i64("target_part_size", opts)?;
+
+    let partition_fields: Vec<_> = opts
+        .remove("partition_fields")
+        .map(|fields| fields.split(',').map(|f| f.to_string()).collect())
+        .unwrap_or_default();
+
+    let time_partition_pattern = opts.remove("time_partition_pattern");
+
+    let partitioning = if time_partition_pattern.is_some() || !partition_fields.is_empty() {
+        Some(Partitioning {
+            time_partition_pattern,
+            partition_fields,
+        })
+    } else {
+        None
+    };
+
+    let file_settings = Some(FileSettings {
+        inactivity_rollover_seconds,
+        max_parts,
+        rollover_seconds,
+        target_file_size,
+        target_part_size,
+        partitioning,
+        commit_style: Some(commit_style),
+    });
+
+    let format_settings = match schema
+        .ok_or(anyhow!("require schema"))?
+        .format
+        .as_ref()
+        .ok_or(anyhow!(
+            "filesystem sink requires a format, such as json or parquet"
+        ))? {
+        Format::Parquet(..) => {
+            let compression = opts
+                .remove("parquet_compression")
+                .map(|value| {
+                    Compression::try_from(&value).map_err(|_err| {
+                        anyhow!("{} is not a valid parquet_compression argument", value)
                     })
-                    .transpose()?;
-                let row_batch_size = pull_option_to_i64("parquet_row_batch_size", opts)?;
-                let row_group_size = pull_option_to_i64("parquet_row_group_size", opts)?;
-                Some(FormatSettings::Parquet {
-                    compression,
-                    row_batch_size,
-                    row_group_size,
                 })
-            }
-            Format::Json(..) => Some(FormatSettings::Json {}),
-            other => bail!("Unsupported format: {:?}", other),
-        };
-
-        self.from_config(
-            None,
-            name,
-            EmptyConfig {},
-            FileSystemTable {
-                write_target: FolderUrl {
-                    path: storage_url,
-                    storage_options,
-                },
-                file_settings,
-                format_settings,
-            },
-            schema,
-        )
-    }
+                .transpose()?;
+            let row_batch_size = pull_option_to_i64("parquet_row_batch_size", opts)?;
+            let row_group_size = pull_option_to_i64("parquet_row_group_size", opts)?;
+            Some(FormatSettings::Parquet {
+                compression,
+                row_batch_size,
+                row_group_size,
+            })
+        }
+        Format::Json(..) => Some(FormatSettings::Json {}),
+        other => bail!("Unsupported format: {:?}", other),
+    };
+    Ok(FileSystemTable {
+        write_target: FolderUrl {
+            path: storage_url,
+            storage_options,
+        },
+        file_settings,
+        format_settings,
+    })
 }
diff --git a/arroyo-connectors/src/lib.rs b/arroyo-connectors/src/lib.rs
index e7dc7ce7a..954d332d3 100644
--- a/arroyo-connectors/src/lib.rs
+++ b/arroyo-connectors/src/lib.rs
@@ -22,6 +22,7 @@ use websocket::WebsocketConnector;
 use self::kafka::KafkaConnector;
 
 pub mod blackhole;
+pub mod delta;
 pub mod filesystem;
 pub mod fluvio;
 pub mod impulse;
@@ -36,6 +37,7 @@ pub mod websocket;
 pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
     let mut m: HashMap<&'static str, Box<dyn ErasedConnector>> = HashMap::new();
     m.insert("blackhole", Box::new(BlackholeConnector {}));
+    m.insert("delta", Box::new(delta::DeltaLakeConnector {}));
     m.insert("filesystem", Box::new(filesystem::FileSystemConnector {}));
     m.insert("fluvio", Box::new(FluvioConnector {}));
     m.insert("impulse", Box::new(ImpulseConnector {}));
diff --git a/arroyo-controller/src/job_controller/mod.rs b/arroyo-controller/src/job_controller/mod.rs
index faede1210..011e94bf5 100644
--- a/arroyo-controller/src/job_controller/mod.rs
+++ b/arroyo-controller/src/job_controller/mod.rs
@@ -8,8 +8,8 @@ use crate::types::public::StopMode as SqlStopMode;
 use anyhow::bail;
 use arroyo_datastream::Program;
 use arroyo_rpc::grpc::{
-    worker_grpc_client::WorkerGrpcClient, CheckpointReq, JobFinishedReq, LoadCompactedDataReq,
-    StopExecutionReq, StopMode, TaskCheckpointEventType,
+    worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq,
+    LoadCompactedDataReq, StopExecutionReq, StopMode, TaskCheckpointEventType,
 };
 use arroyo_state::{BackingStore, StateBackend};
 use arroyo_types::{to_micros, WorkerId};
@@ -433,6 +433,7 @@ impl RunningJobModel {
                             DbCheckpointState::committing,
                         )
                         .await?;
+                        let committing_data = committing_state.committing_data();
                         self.checkpoint_state =
                             Some(CheckpointingOrCommittingState::Committing(committing_state));
                         info!(
@@ -443,12 +444,9 @@ impl RunningJobModel {
                         for worker in self.workers.values_mut() {
                             worker
                                 .connect
-                                .checkpoint(Request::new(CheckpointReq {
-                                    timestamp: to_micros(SystemTime::now()),
-                                    min_epoch: self.min_epoch,
+                                .commit(Request::new(CommitReq {
                                     epoch: self.epoch,
-                                    then_stop: false,
-                                    is_commit: true,
+                                    committing_data: committing_data.clone(),
                                 }))
                                 .await?;
                         }
@@ -707,7 +705,7 @@ impl JobController {
     }
 
     pub async fn send_commit_messages(&mut self) -> anyhow::Result<()> {
-        let Some(CheckpointingOrCommittingState::Committing(_committing)) =
+        let Some(CheckpointingOrCommittingState::Committing(committing)) =
             &self.model.checkpoint_state
         else {
             bail!("should be committing")
@@ -715,13 +713,10 @@ impl JobController {
         for worker in self.model.workers.values_mut() {
             worker
                 .connect
-                .checkpoint(Request::new(CheckpointReq {
-                    timestamp: to_micros(SystemTime::now()),
-                    min_epoch: self.model.min_epoch,
+                .commit(CommitReq {
                     epoch: self.model.epoch,
-                    then_stop: false,
-                    is_commit: true,
-                }))
+                    committing_data: committing.committing_data(),
+                })
                 .await?;
         }
         Ok(())
diff --git a/arroyo-controller/src/states/scheduling.rs b/arroyo-controller/src/states/scheduling.rs
index 52982e16a..512497d38 100644
--- a/arroyo-controller/src/states/scheduling.rs
+++ b/arroyo-controller/src/states/scheduling.rs
@@ -14,7 +14,9 @@ use tonic::{transport::Channel, Request};
 use tracing::{error, info, warn};
 
 use anyhow::anyhow;
-use arroyo_state::{parquet::get_storage_env_vars, BackingStore, StateBackend};
+use arroyo_state::{
+    committing_state::CommittingState, parquet::get_storage_env_vars, BackingStore, StateBackend,
+};
 
 use crate::{
     job_controller::JobController,
@@ -368,6 +370,7 @@ impl State for Scheduling {
             metadata.min_epoch = min_epoch;
             if needs_commits {
                 let mut commit_subtasks = HashSet::new();
+                let mut committing_data = HashMap::new();
                 for operator_id in &metadata.operator_ids {
                     let operator_metadata =
                         StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch)
@@ -378,6 +381,18 @@ impl State for Scheduling {
                             operator_id, ctx.config.id
                         );
                     };
+                    if let Some(commit_data) = operator_metadata.commit_data {
+                        committing_data.insert(
+                            operator_id.clone(),
+                            commit_data
+                                .committing_data
+                                .into_iter()
+                                .map(|(table, commit_data_map)| {
+                                    (table, commit_data_map.commit_data_by_subtask)
+                                })
+                                .collect(),
+                        );
+                    }
                     if operator_metadata.has_state
                         && operator_metadata
                             .tables
@@ -396,7 +411,7 @@ impl State for Scheduling {
                         }
                     }
                 }
-                committing_state = Some((id, commit_subtasks));
+                committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data));
             }
             StateBackend::write_checkpoint_metadata(metadata).await;
         }
diff --git a/arroyo-macro/src/lib.rs b/arroyo-macro/src/lib.rs
index a58f36c08..c35240e67 100644
--- a/arroyo-macro/src/lib.rs
+++ b/arroyo-macro/src/lib.rs
@@ -529,8 +529,8 @@ fn impl_stream_node_type(
                         match control_message {
                             arroyo_rpc::ControlMessage::Checkpoint(_) => tracing::warn!("shouldn't receive checkpoint"),
                             arroyo_rpc::ControlMessage::Stop { mode: _ } => tracing::warn!("shouldn't receive stop"),
-                            arroyo_rpc::ControlMessage::Commit { epoch } => {
-                                self.handle_commit(epoch, &mut ctx).await;
+                            arroyo_rpc::ControlMessage::Commit { epoch, commit_data } => {
+                                self.handle_commit(epoch, commit_data, &mut ctx).await;
                             },
                             arroyo_rpc::ControlMessage::LoadCompacted { compacted } => {
                                 ctx.load_compacted(compacted).await;
@@ -802,7 +802,7 @@ fn impl_stream_node_type(
 
     if !methods.contains("handle_commit") {
         defs.push(quote! {
-            async fn handle_commit(&mut self, epoch: u32, ctx: &mut Context<#out_k, #out_t>) {
+            async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap<char, std::collections::HashMap<u32, Vec<u8>>>, ctx: &mut Context<#out_k, #out_t>) {
                 tracing::warn!("default handling of commit with epoch {:?}", epoch);
             }
         })
diff --git a/arroyo-rpc/proto/rpc.proto b/arroyo-rpc/proto/rpc.proto
index ca46e5abd..8441d767e 100644
--- a/arroyo-rpc/proto/rpc.proto
+++ b/arroyo-rpc/proto/rpc.proto
@@ -224,6 +224,9 @@ message SubtaskCheckpointMetadata {
   uint64 bytes = 7;
 
   repeated BackendData backend_data = 8;
+  // this is data that should be sent along with the commit message
+  // to all subtask. The key is the table name.
+  map<string,bytes> committing_data = 9;
 }
 
 message BackendData {
@@ -246,6 +249,7 @@ message OperatorCheckpointMetadata {
 
   repeated BackendData backend_data = 10;
   uint64 bytes = 11;
+  OperatorCommitData commit_data = 12;
 }
 
 enum TableType {
@@ -315,6 +319,23 @@ message CheckpointReq {
 message CheckpointResp {
 }
 
+message CommitReq {
+  uint32 epoch = 1;
+  map<string, OperatorCommitData> committing_data = 2;
+}
+
+message CommitResp {
+}
+
+message OperatorCommitData {
+  // map from table name to commit data for that table.
+  map<string, TableCommitData> committing_data = 1;
+}
+
+message TableCommitData {
+  map<uint32, bytes> commit_data_by_subtask = 1;  
+}
+
 message LoadCompactedDataReq {
   string operator_id = 1;
   repeated BackendData backend_data_to_drop = 2; // this data got compressed...
@@ -347,6 +368,7 @@ message JobFinishedResp {
 service WorkerGrpc {
   rpc StartExecution(StartExecutionReq) returns (StartExecutionResp);
   rpc Checkpoint(CheckpointReq) returns (CheckpointResp);
+  rpc Commit(CommitReq) returns (CommitResp);
   rpc LoadCompactedData(LoadCompactedDataReq) returns (LoadCompactedDataRes);
   rpc StopExecution(StopExecutionReq) returns (StopExecutionResp);
   rpc JobFinished(JobFinishedReq) returns (JobFinishedResp);
diff --git a/arroyo-rpc/src/lib.rs b/arroyo-rpc/src/lib.rs
index ca4431030..78c832abd 100644
--- a/arroyo-rpc/src/lib.rs
+++ b/arroyo-rpc/src/lib.rs
@@ -2,6 +2,7 @@ pub mod api_types;
 pub mod formats;
 pub mod public_ids;
 
+use std::collections::HashMap;
 use std::{fs, time::SystemTime};
 
 use crate::api_types::connections::PrimitiveType;
@@ -32,9 +33,16 @@ pub mod grpc {
 #[derive(Debug)]
 pub enum ControlMessage {
     Checkpoint(CheckpointBarrier),
-    Stop { mode: StopMode },
-    Commit { epoch: u32 },
-    LoadCompacted { compacted: CompactionResult },
+    Stop {
+        mode: StopMode,
+    },
+    Commit {
+        epoch: u32,
+        commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
+    },
+    LoadCompacted {
+        compacted: CompactionResult,
+    },
     NoOp,
 }
 
diff --git a/arroyo-state/src/checkpoint_state.rs b/arroyo-state/src/checkpoint_state.rs
index 08899db6e..e680cd069 100644
--- a/arroyo-state/src/checkpoint_state.rs
+++ b/arroyo-state/src/checkpoint_state.rs
@@ -2,8 +2,8 @@ use crate::committing_state::CommittingState;
 use crate::subtask_state::SubtaskState;
 use crate::{BackingStore, StateBackend};
 use anyhow::{anyhow, bail};
-use arroyo_rpc::grpc;
 use arroyo_rpc::grpc::api::OperatorCheckpointDetail;
+use arroyo_rpc::grpc::{self, OperatorCommitData, TableCommitData};
 use arroyo_rpc::grpc::{
     api, backend_data, BackendData, CheckpointMetadata, OperatorCheckpointMetadata,
     TableDescriptor, TableWriteBehavior, TaskCheckpointCompletedReq, TaskCheckpointEventReq,
@@ -23,6 +23,7 @@ pub struct CheckpointState {
     tasks: HashMap<String, BTreeMap<u32, SubtaskState>>,
     completed_operators: HashSet<String>,
     subtasks_to_commit: HashSet<(String, u32)>,
+    committing_backend_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
 
     // Used for the web ui -- eventually should be replaced with some other way of tracking / reporting
     // this data
@@ -47,6 +48,7 @@ impl CheckpointState {
             tasks: HashMap::new(),
             completed_operators: HashSet::new(),
             subtasks_to_commit: HashSet::new(),
+            committing_backend_data: HashMap::new(),
             operator_details: HashMap::new(),
         }
     }
@@ -174,6 +176,14 @@ impl CheckpointState {
             });
         detail.bytes = Some(metadata.bytes);
         detail.finish_time = Some(metadata.finish_time);
+        for (table, committing_data) in &metadata.committing_data {
+            self.committing_backend_data
+                .entry(c.operator_id.clone())
+                .or_default()
+                .entry(table.to_string())
+                .or_default()
+                .insert(metadata.subtask_index, committing_data.clone());
+        }
 
         // this is for the actual checkpoint management
 
@@ -281,6 +291,27 @@ impl CheckpointState {
             tables: tables.into_values().collect(),
             backend_data: backend_data.into_values().collect(),
             bytes: size,
+            commit_data: self
+                .committing_backend_data
+                .get(&operator_id)
+                .map(|commit_data| OperatorCommitData {
+                    committing_data: commit_data
+                        .iter()
+                        .map(|(table_name, subtask_to_commit_data)| {
+                            (
+                                table_name.clone(),
+                                TableCommitData {
+                                    commit_data_by_subtask: subtask_to_commit_data
+                                        .iter()
+                                        .map(|(subtask_index, commit_data)| {
+                                            (*subtask_index, commit_data.clone())
+                                        })
+                                        .collect(),
+                                },
+                            )
+                        })
+                        .collect(),
+                }),
         })
         .await;
 
@@ -307,7 +338,11 @@ impl CheckpointState {
     }
 
     pub fn committing_state(&self) -> CommittingState {
-        CommittingState::new(self.checkpoint_id, self.subtasks_to_commit.clone())
+        CommittingState::new(
+            self.checkpoint_id,
+            self.subtasks_to_commit.clone(),
+            self.committing_backend_data.clone(),
+        )
     }
 
     pub async fn save_state(&self) -> anyhow::Result<()> {
diff --git a/arroyo-state/src/committing_state.rs b/arroyo-state/src/committing_state.rs
index 1439a6c9b..50cda5160 100644
--- a/arroyo-state/src/committing_state.rs
+++ b/arroyo-state/src/committing_state.rs
@@ -1,15 +1,23 @@
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
+
+use arroyo_rpc::grpc::{OperatorCommitData, TableCommitData};
 
 pub struct CommittingState {
     checkpoint_id: i64,
     subtasks_to_commit: HashSet<(String, u32)>,
+    committing_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
 }
 
 impl CommittingState {
-    pub fn new(checkpoint_id: i64, subtasks_to_commit: HashSet<(String, u32)>) -> Self {
+    pub fn new(
+        checkpoint_id: i64,
+        subtasks_to_commit: HashSet<(String, u32)>,
+        committing_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
+    ) -> Self {
         Self {
             checkpoint_id,
             subtasks_to_commit,
+            committing_data,
         }
     }
 
@@ -25,13 +33,35 @@ impl CommittingState {
     pub fn done(&self) -> bool {
         self.subtasks_to_commit.is_empty()
     }
-}
 
-impl From<(i64, HashSet<(String, u32)>)> for CommittingState {
-    fn from((checkpoint_id, subtasks_to_commit): (i64, HashSet<(String, u32)>)) -> Self {
-        Self {
-            checkpoint_id,
-            subtasks_to_commit,
-        }
+    pub fn committing_data(&self) -> HashMap<String, OperatorCommitData> {
+        let operators_to_commit: HashSet<_> = self
+            .subtasks_to_commit
+            .iter()
+            .map(|(operator_id, _subtask_id)| operator_id.clone())
+            .collect();
+        operators_to_commit
+            .into_iter()
+            .map(|operator_id| {
+                let committing_data = self
+                    .committing_data
+                    .get(&operator_id)
+                    .map(|table_map| {
+                        table_map
+                            .iter()
+                            .map(|(table_name, subtask_to_commit_data)| {
+                                (
+                                    table_name.clone(),
+                                    TableCommitData {
+                                        commit_data_by_subtask: subtask_to_commit_data.clone(),
+                                    },
+                                )
+                            })
+                            .collect()
+                    })
+                    .unwrap_or_default();
+                (operator_id, OperatorCommitData { committing_data })
+            })
+            .collect()
     }
 }
diff --git a/arroyo-state/src/lib.rs b/arroyo-state/src/lib.rs
index c9e638a8c..20b04b4cb 100644
--- a/arroyo-state/src/lib.rs
+++ b/arroyo-state/src/lib.rs
@@ -202,6 +202,7 @@ pub trait BackingStore {
     async fn get_key_values<K: Key, V: Data>(&self, table: char) -> Vec<(K, V)>;
 
     async fn load_compacted(&mut self, compaction: CompactionResult);
+    async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>);
 }
 
 pub struct StateStore<S: BackingStore> {
@@ -416,6 +417,12 @@ impl<S: BackingStore> StateStore<S> {
     pub async fn load_compacted(&mut self, compaction: CompactionResult) {
         self.backend.load_compacted(compaction).await;
     }
+
+    pub async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>) {
+        self.backend
+            .insert_committing_data(table, committing_data)
+            .await
+    }
 }
 
 #[cfg(test)]
@@ -558,6 +565,7 @@ mod test {
             tables: default_tables(),
             backend_data: message.subtask_metadata.backend_data,
             bytes: 5,
+            commit_data: None,
         })
         .await;
 
diff --git a/arroyo-state/src/parquet.rs b/arroyo-state/src/parquet.rs
index 7858a925a..aee84f773 100644
--- a/arroyo-state/src/parquet.rs
+++ b/arroyo-state/src/parquet.rs
@@ -483,6 +483,17 @@ impl BackingStore for ParquetBackend {
     async fn load_compacted(&mut self, compaction: CompactionResult) {
         self.writer.load_compacted_data(compaction).await;
     }
+
+    async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>) {
+        self.writer
+            .sender
+            .send(ParquetQueueItem::CommitData {
+                table,
+                data: committing_data,
+            })
+            .await
+            .unwrap();
+    }
 }
 
 impl ParquetBackend {
@@ -1017,6 +1028,7 @@ impl ParquetWriter {
                 .map(|table| (table.name.chars().next().unwrap(), table.clone()))
                 .collect(),
             builders: HashMap::new(),
+            commit_data: HashMap::new(),
             current_files,
             load_compacted_tx,
             new_compacted: vec![],
@@ -1085,6 +1097,7 @@ impl ParquetWriter {
 enum ParquetQueueItem {
     Write(ParquetWrite),
     Checkpoint(ParquetCheckpoint),
+    CommitData { table: char, data: Vec<u8> },
 }
 
 #[derive(Debug)]
@@ -1225,6 +1238,7 @@ struct ParquetFlusher {
     task_info: TaskInfo,
     table_descriptors: HashMap<char, TableDescriptor>,
     builders: HashMap<char, RecordBatchBuilder>,
+    commit_data: HashMap<char, Vec<u8>>,
     current_files: HashMap<char, BTreeMap<u32, Vec<ParquetStoreData>>>, // table -> epoch -> file
     load_compacted_tx: Receiver<CompactionResult>,
     new_compacted: Vec<ParquetStoreData>,
@@ -1329,6 +1343,9 @@ impl ParquetFlusher {
                         Some(ParquetQueueItem::Checkpoint(epoch)) => {
                             checkpoint_epoch = Some(epoch);
                         },
+                        Some(ParquetQueueItem::CommitData{table, data}) => {
+                            self.commit_data.insert(table, data);
+                        }
                         None => {
                             debug!("Parquet flusher closed");
                             return Ok(false);
@@ -1440,6 +1457,11 @@ impl ParquetFlusher {
                 watermark: cp.watermark.map(to_micros),
                 backend_data: checkpoint_backend_data,
                 bytes: bytes as u64,
+                committing_data: self
+                    .commit_data
+                    .drain()
+                    .map(|(table, data)| (table.to_string(), data))
+                    .collect(),
             };
             self.control_tx
                 .send(ControlResp::CheckpointCompleted(CheckpointCompleted {
diff --git a/arroyo-storage/src/lib.rs b/arroyo-storage/src/lib.rs
index b8a8ddeb6..adacbdb86 100644
--- a/arroyo-storage/src/lib.rs
+++ b/arroyo-storage/src/lib.rs
@@ -8,22 +8,23 @@ use std::{
 use arroyo_types::{S3_ENDPOINT_ENV, S3_REGION_ENV};
 use aws::ArroyoCredentialProvider;
 use bytes::Bytes;
-use object_store::aws::AmazonS3ConfigKey;
+use object_store::aws::{AmazonS3ConfigKey, AwsCredential};
 use object_store::gcp::GoogleCloudStorageBuilder;
 use object_store::multipart::PartId;
 use object_store::path::Path;
-use object_store::MultipartId;
 use object_store::{aws::AmazonS3Builder, local::LocalFileSystem, ObjectStore};
+use object_store::{CredentialProvider, MultipartId};
 use regex::{Captures, Regex};
 use thiserror::Error;
 
 mod aws;
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct StorageProvider {
     config: BackendConfig,
     object_store: Arc<dyn ObjectStore>,
     canonical_url: String,
+    storage_options: HashMap<String, String>,
 }
 
 #[derive(Error, Debug)]
@@ -263,6 +264,12 @@ fn last<I: Sized, const COUNT: usize>(opts: [Option<I>; COUNT]) -> Option<I> {
     opts.into_iter().flatten().last()
 }
 
+pub async fn get_current_credentials() -> Result<Arc<AwsCredential>, StorageError> {
+    let provider = ArroyoCredentialProvider::try_new()?;
+    let credentials = provider.get_credential().await?;
+    Ok(credentials)
+}
+
 impl StorageProvider {
     pub async fn for_url(url: &str) -> Result<Self, StorageError> {
         Self::for_url_with_options(url, HashMap::new()).await
@@ -279,6 +286,7 @@ impl StorageProvider {
             BackendConfig::Local(config) => Self::construct_local(config).await,
         }
     }
+
     pub async fn get_url(url: &str) -> Result<Bytes, StorageError> {
         Self::get_url_with_options(url, HashMap::new()).await
     }
@@ -324,6 +332,7 @@ impl StorageProvider {
     ) -> Result<Self, StorageError> {
         let mut builder = AmazonS3Builder::from_env().with_bucket_name(&config.bucket);
         let mut aws_key_manually_set = false;
+        let mut s3_options = HashMap::new();
         for (key, value) in options {
             let s3_config_key = key.parse().map_err(|_| {
                 StorageError::CredentialsError(format!("invalid S3 config key: {}", key))
@@ -331,10 +340,13 @@ impl StorageProvider {
             if AmazonS3ConfigKey::AccessKeyId == s3_config_key {
                 aws_key_manually_set = true;
             }
+            s3_options.insert(s3_config_key.clone(), value.clone());
             builder = builder.with_config(s3_config_key, value);
         }
+
         if !aws_key_manually_set {
-            let credentials = Arc::new(ArroyoCredentialProvider::try_new()?);
+            let credentials: Arc<ArroyoCredentialProvider> =
+                Arc::new(ArroyoCredentialProvider::try_new()?);
             builder = builder.with_credentials(credentials);
         }
 
@@ -342,6 +354,7 @@ impl StorageProvider {
         config.region = config.region.or(default_region);
         if let Some(region) = &config.region {
             builder = builder.with_region(region);
+            s3_options.insert(AmazonS3ConfigKey::Region, region.clone());
         }
 
         if let Some(endpoint) = &config.endpoint {
@@ -349,24 +362,27 @@ impl StorageProvider {
                 .with_endpoint(endpoint)
                 .with_virtual_hosted_style_request(false)
                 .with_allow_http(true);
+            s3_options.insert(AmazonS3ConfigKey::Endpoint, endpoint.clone());
+            s3_options.insert(
+                AmazonS3ConfigKey::VirtualHostedStyleRequest,
+                "false".to_string(),
+            );
+            s3_options.insert(
+                AmazonS3ConfigKey::Client(object_store::ClientConfigKey::AllowHttp),
+                "true".to_string(),
+            );
         }
 
-        let canonical_url = match (&config.region, &config.endpoint) {
-            (_, Some(endpoint)) => {
-                format!("s3::{}/{}", endpoint, config.bucket)
-            }
-            (Some(region), _) => {
-                format!("https://s3.{}.amazonaws.com/{}", region, config.bucket)
-            }
-            _ => {
-                format!("https://s3.amazonaws.com/{}", config.bucket)
-            }
-        };
+        let canonical_url = format!("s3://{}", config.bucket);
 
         Ok(Self {
             config: BackendConfig::S3(config),
             object_store: Arc::new(builder.build().map_err(|e| Into::<StorageError>::into(e))?),
             canonical_url,
+            storage_options: s3_options
+                .into_iter()
+                .map(|(k, v)| (k.as_ref().to_string(), v))
+                .collect(),
         })
     }
 
@@ -381,6 +397,7 @@ impl StorageProvider {
             config: BackendConfig::GCS(config),
             object_store: Arc::new(gcs),
             canonical_url,
+            storage_options: HashMap::new(),
         })
     }
 
@@ -402,6 +419,7 @@ impl StorageProvider {
             config: BackendConfig::Local(config),
             object_store,
             canonical_url,
+            storage_options: HashMap::new(),
         })
     }
 
@@ -483,10 +501,17 @@ impl StorageProvider {
     pub fn canonical_url(&self) -> &str {
         &self.canonical_url
     }
+    pub fn storage_options(&self) -> &HashMap<String, String> {
+        &self.storage_options
+    }
 
     pub fn config(&self) -> &BackendConfig {
         &self.config
     }
+
+    pub fn get_backing_store(&self) -> Arc<dyn ObjectStore> {
+        self.object_store.clone()
+    }
 }
 
 #[cfg(test)]
diff --git a/arroyo-worker/Cargo.toml b/arroyo-worker/Cargo.toml
index f5f1adcb0..979e646fd 100644
--- a/arroyo-worker/Cargo.toml
+++ b/arroyo-worker/Cargo.toml
@@ -40,6 +40,7 @@ md-5 = "0.10"
 hex = "0.4"
 url = "2.4.0"
 ordered-float = "3"
+deltalake = {version = "0.16.0", features = ["s3", "arrow"] }
 arrow = { workspace = true }
 parquet = { workspace = true, features = ["async"]}
 arrow-array = { workspace = true}
diff --git a/arroyo-worker/src/connectors/filesystem/delta.rs b/arroyo-worker/src/connectors/filesystem/delta.rs
new file mode 100644
index 000000000..d4cbcc4ee
--- /dev/null
+++ b/arroyo-worker/src/connectors/filesystem/delta.rs
@@ -0,0 +1,196 @@
+use super::FinishedFile;
+use anyhow::{Context, Result};
+use arrow::datatypes::Schema;
+use arroyo_storage::{get_current_credentials, StorageProvider};
+use arroyo_types::to_millis;
+use deltalake::{
+    operations::create::CreateBuilder,
+    protocol::{Action, Add, SaveMode},
+    table::{builder::s3_storage_options::AWS_S3_ALLOW_UNSAFE_RENAME, PeekCommit},
+    DeltaTableBuilder,
+};
+use object_store::{aws::AmazonS3ConfigKey, path::Path};
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+    time::SystemTime,
+};
+
+pub(crate) async fn commit_files_to_delta(
+    finished_files: Vec<FinishedFile>,
+    relative_table_path: Path,
+    storage_provider: Arc<StorageProvider>,
+    last_version: i64,
+    schema: Schema,
+) -> Result<Option<i64>> {
+    if finished_files.is_empty() {
+        return Ok(None);
+    }
+
+    let add_actions = create_add_actions(&finished_files, &relative_table_path)?;
+    let table_path = build_table_path(&storage_provider, &relative_table_path);
+    let storage_options = configure_storage_options(&table_path, storage_provider.clone()).await?;
+    let mut table = load_or_create_table(&table_path, storage_options.clone(), &schema).await?;
+
+    if let Some(new_version) = check_existing_files(
+        &mut table,
+        last_version,
+        &finished_files,
+        &relative_table_path,
+    )
+    .await?
+    {
+        return Ok(Some(new_version));
+    }
+
+    let new_version = commit_to_delta(table, add_actions).await?;
+    Ok(Some(new_version))
+}
+
+async fn load_or_create_table(
+    table_path: &str,
+    storage_options: HashMap<String, String>,
+    schema: &Schema,
+) -> Result<deltalake::DeltaTable> {
+    match DeltaTableBuilder::from_uri(table_path)
+        .with_storage_options(storage_options.clone())
+        .load()
+        .await
+    {
+        Ok(table) => Ok(table),
+        Err(deltalake::DeltaTableError::NotATable(_)) => {
+            create_new_table(table_path, storage_options, schema).await
+        }
+        Err(err) => Err(err.into()),
+    }
+}
+
+async fn create_new_table(
+    table_path: &str,
+    storage_options: HashMap<String, String>,
+    schema: &Schema,
+) -> Result<deltalake::DeltaTable> {
+    let delta_object_store = DeltaTableBuilder::from_uri(table_path)
+        .with_storage_options(storage_options)
+        .build_storage()?;
+    let delta_schema: deltalake::Schema = (schema).try_into()?;
+    CreateBuilder::new()
+        .with_object_store(delta_object_store)
+        .with_columns(delta_schema.get_fields().clone())
+        .await
+        .map_err(Into::into)
+}
+
+async fn configure_storage_options(
+    table_path: &str,
+    storage_provider: Arc<StorageProvider>,
+) -> Result<HashMap<String, String>> {
+    let mut options = storage_provider.storage_options().clone();
+    if table_path.starts_with("s3://") {
+        update_s3_credentials(&mut options).await?;
+    }
+    Ok(options)
+}
+
+async fn update_s3_credentials(options: &mut HashMap<String, String>) -> Result<()> {
+    if !options.contains_key(AmazonS3ConfigKey::SecretAccessKey.as_ref()) {
+        let tmp_credentials = get_current_credentials().await?;
+        options.insert(
+            AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
+            tmp_credentials.key_id.clone(),
+        );
+        options.insert(
+            AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
+            tmp_credentials.secret_key.clone(),
+        );
+    }
+    options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string());
+    Ok(())
+}
+
+fn create_add_actions(
+    finished_files: &[FinishedFile],
+    relative_table_path: &Path,
+) -> Result<Vec<Action>> {
+    finished_files
+        .iter()
+        .map(|file| create_add_action(file, relative_table_path))
+        .collect()
+}
+
+fn create_add_action(file: &FinishedFile, relative_table_path: &Path) -> Result<Action> {
+    let subpath = file
+        .filename
+        .strip_prefix(&relative_table_path.to_string())
+        .context(format!(
+            "File {} is not in table {}",
+            file.filename,
+            relative_table_path.to_string()
+        ))?;
+    Ok(Action::add(Add {
+        path: subpath.to_string(),
+        size: file.size as i64,
+        partition_values: HashMap::new(),
+        modification_time: to_millis(SystemTime::now()) as i64,
+        data_change: true,
+        ..Default::default()
+    }))
+}
+
+async fn check_existing_files(
+    table: &mut deltalake::DeltaTable,
+    last_version: i64,
+    finished_files: &[FinishedFile],
+    relative_table_path: &Path,
+) -> Result<Option<i64>> {
+    if last_version >= table.version() {
+        return Ok(None);
+    }
+
+    let files: HashSet<_> = finished_files
+        .iter()
+        .map(|file| {
+            file.filename
+                .strip_prefix(&relative_table_path.to_string())
+                .unwrap()
+                .to_string()
+        })
+        .collect();
+
+    let mut version_to_check = last_version;
+    while let PeekCommit::New(version, actions) = table.peek_next_commit(version_to_check).await? {
+        for action in actions {
+            if let Action::add(add) = action {
+                if files.contains(&add.path) {
+                    return Ok(Some(version));
+                }
+            }
+        }
+        version_to_check = version;
+    }
+    Ok(None)
+}
+
+async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec<Action>) -> Result<i64> {
+    deltalake::operations::transaction::commit(
+        table.object_store().as_ref(),
+        &add_actions,
+        deltalake::protocol::DeltaOperation::Write {
+            mode: SaveMode::Append,
+            partition_by: None,
+            predicate: None,
+        },
+        table.get_state(),
+        None,
+    )
+    .await
+    .map_err(Into::into)
+}
+
+fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String {
+    format!(
+        "{}/{}",
+        storage_provider.canonical_url(),
+        relative_table_path
+    )
+}
diff --git a/arroyo-worker/src/connectors/filesystem/mod.rs b/arroyo-worker/src/connectors/filesystem/mod.rs
index b1f315a15..48bff442c 100644
--- a/arroyo-worker/src/connectors/filesystem/mod.rs
+++ b/arroyo-worker/src/connectors/filesystem/mod.rs
@@ -24,18 +24,21 @@ use typify::import_types;
 import_types!(schema = "../connector-schemas/filesystem/table.json");
 
 use arroyo_types::*;
+mod delta;
 pub mod json;
 pub mod local;
 pub mod parquet;
 pub mod single_file;
 
+use crate::SchemaData;
+
 use self::{
     json::{JsonLocalWriter, JsonWriter, PassThrough},
     local::{LocalFileSystemWriter, LocalWriter},
     parquet::{FixedSizeRecordBatchBuilder, ParquetLocalWriter, RecordBatchBufferingWriter},
 };
 
-use super::two_phase_committer::{TwoPhaseCommitter, TwoPhaseCommitterOperator};
+use super::two_phase_committer::{CommitStrategy, TwoPhaseCommitter, TwoPhaseCommitterOperator};
 
 pub struct FileSystemSink<
     K: Key,
@@ -45,6 +48,7 @@ pub struct FileSystemSink<
     sender: Sender<FileSystemMessages<T>>,
     partitioner: Option<Box<dyn Fn(&Record<K, T>) -> String + Send>>,
     checkpoint_receiver: Receiver<CheckpointData<T>>,
+    commit_strategy: CommitStrategy,
     _ts: PhantomData<(K, R)>,
 }
 
@@ -61,6 +65,12 @@ pub type LocalParquetFileSystemSink<K, T, R> = LocalFileSystemWriter<K, T, Parqu
 
 pub type LocalJsonFileSystemSink<K, T> = LocalFileSystemWriter<K, T, JsonLocalWriter>;
 
+pub type DeltaFileSystemSink<K, T, R> = FileSystemSink<
+    K,
+    T,
+    BatchMultipartWriter<FixedSizeRecordBatchBuilder<R>, RecordBatchBufferingWriter<R>>,
+>;
+
 impl<K: Key, T: Data + Sync + Serialize, V: LocalWriter<T>> LocalFileSystemWriter<K, T, V> {
     pub fn from_config(config_str: &str) -> TwoPhaseCommitterOperator<K, T, Self> {
         let config: OperatorConfig =
@@ -73,18 +83,20 @@ impl<K: Key, T: Data + Sync + Serialize, V: LocalWriter<T>> LocalFileSystemWrite
     }
 }
 
-impl<K: Key, T: Data + Sync + Serialize, R: MultiPartWriter<InputType = T> + Send + 'static>
-    FileSystemSink<K, T, R>
+impl<
+        K: Key,
+        T: Data + Sync + SchemaData + Serialize,
+        R: MultiPartWriter<InputType = T> + Send + 'static,
+    > FileSystemSink<K, T, R>
 {
-    pub fn from_config(config_str: &str) -> TwoPhaseCommitterOperator<K, T, Self> {
-        let config: OperatorConfig =
-            serde_json::from_str(config_str).expect("Invalid config for FileSystemSink");
-        let table: FileSystemTable =
-            serde_json::from_value(config.table).expect("Invalid table config for FileSystemSink");
-
+    pub fn create_and_start(table: FileSystemTable) -> TwoPhaseCommitterOperator<K, T, Self> {
         let (sender, receiver) = tokio::sync::mpsc::channel(10000);
         let (checkpoint_sender, checkpoint_receiver) = tokio::sync::mpsc::channel(10000);
         let partition_func = get_partitioner_from_table(&table);
+        let commit_strategy = match table.file_settings.as_ref().unwrap().commit_style.unwrap() {
+            CommitStyle::Direct => CommitStrategy::PerSubtask,
+            CommitStyle::DeltaLake => CommitStrategy::PerOperator,
+        };
         tokio::spawn(async move {
             let path: Path = StorageProvider::get_key(&table.write_target.path)
                 .unwrap()
@@ -104,13 +116,23 @@ impl<K: Key, T: Data + Sync + Serialize, R: MultiPartWriter<InputType = T> + Sen
             );
             writer.run().await.unwrap();
         });
+
         TwoPhaseCommitterOperator::new(Self {
             sender,
             checkpoint_receiver,
+            commit_strategy,
             partitioner: partition_func,
             _ts: PhantomData,
         })
     }
+
+    pub fn from_config(config_str: &str) -> TwoPhaseCommitterOperator<K, T, Self> {
+        let config: OperatorConfig =
+            serde_json::from_str(config_str).expect("Invalid config for FileSystemSink");
+        let table: FileSystemTable =
+            serde_json::from_value(config.table).expect("Invalid table config for FileSystemSink");
+        Self::create_and_start(table)
+    }
 }
 
 fn get_partitioner_from_table<K: Key, T: Data + Serialize>(
@@ -183,7 +205,10 @@ enum FileSystemMessages<T: Data> {
 #[derive(Debug)]
 enum CheckpointData<T: Data> {
     InProgressFileCheckpoint(InProgressFileCheckpoint<T>),
-    Finished { max_file_index: usize },
+    Finished {
+        max_file_index: usize,
+        delta_version: i64,
+    },
 }
 
 #[derive(Decode, Encode, Clone, PartialEq, Eq)]
@@ -193,6 +218,7 @@ struct InProgressFileCheckpoint<T: Data> {
     data: FileCheckpointData,
     buffered_data: Vec<T>,
     representative_timestamp: SystemTime,
+    pushed_size: usize,
 }
 
 impl<T: Data> std::fmt::Debug for InProgressFileCheckpoint<T> {
@@ -318,7 +344,7 @@ pub enum InFlightPartCheckpoint {
     InProgressPart { part: usize, data: Vec<u8> },
 }
 
-struct AsyncMultipartFileSystemWriter<T: Data + Sync, R: MultiPartWriter> {
+struct AsyncMultipartFileSystemWriter<T: Data + SchemaData + Sync, R: MultiPartWriter> {
     path: Path,
     active_writers: HashMap<Option<String>, String>,
     watermark: Option<SystemTime>,
@@ -332,6 +358,13 @@ struct AsyncMultipartFileSystemWriter<T: Data + Sync, R: MultiPartWriter> {
     files_to_finish: Vec<FileToFinish>,
     properties: FileSystemTable,
     rolling_policy: RollingPolicy,
+    commit_state: CommitState,
+}
+
+#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
+pub enum CommitState {
+    DeltaLake { last_version: i64 },
+    VanillaParquet,
 }
 
 #[async_trait]
@@ -380,6 +413,7 @@ async fn from_checkpoint(
     path: &Path,
     partition: Option<String>,
     checkpoint_data: FileCheckpointData,
+    mut pushed_size: usize,
     object_store: Arc<StorageProvider>,
 ) -> Result<Option<FileToFinish>> {
     let mut parts = vec![];
@@ -397,6 +431,7 @@ async fn from_checkpoint(
                 .expect("failed to create multipart upload");
             let mut parts = vec![];
             for (part_index, data) in parts_to_add.into_iter().enumerate() {
+                pushed_size += data.len();
                 let upload_part = object_store
                     .add_multipart(path, &multipart_id, part_index, data.into())
                     .await
@@ -404,6 +439,7 @@ async fn from_checkpoint(
                 parts.push(upload_part);
             }
             if let Some(trailing_bytes) = trailing_bytes {
+                pushed_size += trailing_bytes.len();
                 let upload_part = object_store
                     .add_multipart(path, &multipart_id, parts.len(), trailing_bytes.into())
                     .await?;
@@ -432,6 +468,7 @@ async fn from_checkpoint(
                 }
             }
             if let Some(trailing_bytes) = trailing_bytes {
+                pushed_size += trailing_bytes.len();
                 let upload_part = object_store
                     .add_multipart(
                         path,
@@ -480,6 +517,7 @@ async fn from_checkpoint(
         partition,
         multi_part_upload_id: multipart_id,
         completed_parts: parts.into_iter().map(|p| p.content_id).collect(),
+        size: pushed_size,
     }))
 }
 
@@ -489,6 +527,14 @@ pub struct FileToFinish {
     partition: Option<String>,
     multi_part_upload_id: String,
     completed_parts: Vec<String>,
+    size: usize,
+}
+
+#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
+pub struct FinishedFile {
+    filename: String,
+    partition: Option<String>,
+    size: usize,
 }
 
 enum RollingPolicy {
@@ -573,7 +619,7 @@ pub struct MultiPartWriterStats {
 
 impl<T, R> AsyncMultipartFileSystemWriter<T, R>
 where
-    T: Data + std::marker::Sync,
+    T: Data + SchemaData + std::marker::Sync,
     R: MultiPartWriter<InputType = T>,
 {
     fn new(
@@ -599,6 +645,7 @@ where
                 writer_properties.file_settings.as_ref().unwrap(),
             ),
             properties: writer_properties,
+            commit_state: CommitState::DeltaLake { last_version: -1 },
         }
     }
 
@@ -622,7 +669,7 @@ where
                             self.subtask_id = subtask_id;
                             for recovered_file in recovered_files {
                                 if let Some(file_to_finish) = from_checkpoint(
-                                     &Path::parse(&recovered_file.filename)?, recovered_file.partition.clone(), recovered_file.data, self.object_store.clone()).await? {
+                                     &Path::parse(&recovered_file.filename)?, recovered_file.partition.clone(), recovered_file.data, recovered_file.pushed_size, self.object_store.clone()).await? {
                                         self.add_part_to_finish(file_to_finish);
                                      }
 
@@ -638,13 +685,12 @@ where
                                 self.stop().await?;
                             }
                             self.take_checkpoint( subtask_id).await?;
-                            self.checkpoint_sender.send(CheckpointData::Finished {  max_file_index: self.max_file_index}).await?;
+                            let delta_version = self.delta_version();
+                            self.checkpoint_sender.send({CheckpointData::Finished {  max_file_index: self.max_file_index,
+                            delta_version}}).await?;
                         },
                         FileSystemMessages::FilesToFinish(files_to_finish) =>{
-                            for file_to_finish in files_to_finish {
-                                self.finish_file(file_to_finish).await?;
-                            }
-                            self.checkpoint_sender.send(CheckpointData::Finished {  max_file_index: self.max_file_index}).await?;
+                            self.finish_files(files_to_finish).await?;
                         }
                     }
                 }
@@ -757,17 +803,56 @@ where
         }
     }
 
-    async fn finish_file(&mut self, file_to_finish: FileToFinish) -> Result<()> {
+    async fn finish_files(&mut self, files_to_finish: Vec<FileToFinish>) -> Result<()> {
+        let mut finished_files: Vec<FinishedFile> = vec![];
+        for file_to_finish in files_to_finish {
+            if let Some(file) = self.finish_file(file_to_finish).await? {
+                finished_files.push(file);
+            }
+        }
+        if let CommitState::DeltaLake { last_version } = self.commit_state {
+            if let Some(new_version) = delta::commit_files_to_delta(
+                finished_files,
+                self.path.clone(),
+                self.object_store.clone(),
+                last_version,
+                T::schema(),
+            )
+            .await?
+            {
+                self.commit_state = CommitState::DeltaLake {
+                    last_version: new_version,
+                };
+            }
+        }
+        let finished_message = CheckpointData::Finished {
+            max_file_index: self.max_file_index,
+            delta_version: self.delta_version(),
+        };
+        self.checkpoint_sender.send(finished_message).await?;
+        Ok(())
+    }
+
+    fn delta_version(&mut self) -> i64 {
+        match self.commit_state {
+            CommitState::DeltaLake { last_version } => last_version,
+            CommitState::VanillaParquet => 0,
+        }
+    }
+
+    async fn finish_file(&mut self, file_to_finish: FileToFinish) -> Result<Option<FinishedFile>> {
         let FileToFinish {
             filename,
-            partition: _,
+            partition,
             multi_part_upload_id,
             completed_parts,
+            size,
         } = file_to_finish;
         if completed_parts.len() == 0 {
             warn!("no parts to finish for file {}", filename);
-            return Ok(());
+            return Ok(None);
         }
+
         let parts: Vec<_> = completed_parts
             .into_iter()
             .map(|content_id| PartId {
@@ -775,10 +860,39 @@ where
             })
             .collect();
         let location = Path::parse(&filename)?;
-        self.object_store
+        match self
+            .object_store
             .close_multipart(&location, &multi_part_upload_id, parts)
-            .await?;
-        Ok(())
+            .await
+        {
+            Ok(_) => Ok(Some(FinishedFile {
+                filename,
+                partition,
+                size,
+            })),
+            Err(err) => {
+                warn!(
+                    "when attempting to complete {}, received an error: {}",
+                    filename, err
+                );
+                // check if the file is already there with the correct size.
+                let contents = self.object_store.get(&filename).await?;
+                if contents.len() == size {
+                    Ok(Some(FinishedFile {
+                        filename,
+                        partition,
+                        size,
+                    }))
+                } else {
+                    bail!(
+                        "file written to {} should have length of {}, not {}",
+                        filename,
+                        size,
+                        contents.len()
+                    );
+                }
+            }
+        }
     }
 
     async fn stop(&mut self) -> Result<()> {
@@ -814,6 +928,7 @@ where
                         .as_ref()
                         .unwrap()
                         .representative_timestamp,
+                    pushed_size: writer.stats().as_ref().unwrap().bytes_written,
                 });
             self.checkpoint_sender.send(in_progress_checkpoint).await?;
         }
@@ -830,6 +945,7 @@ where
                         buffered_data: vec![],
                         // TODO: this is only needed if there is buffered data, so this is a dummy value
                         representative_timestamp: SystemTime::now(),
+                        pushed_size: file_to_finish.size,
                     },
                 ))
                 .await?;
@@ -906,6 +1022,7 @@ impl MultipartManager {
             // TODO: use Bytes to avoid clone
             data: part_to_upload.byte_data.clone(),
         });
+        self.pushed_size += part_to_upload.byte_data.len();
         let location = self.location.clone();
         let multipart_id = self
             .multipart_id
@@ -988,6 +1105,7 @@ impl MultipartManager {
                         }
                     })
                     .collect::<Result<Vec<_>>>()?,
+                size: self.pushed_size,
             }))
         }
     }
@@ -1118,6 +1236,7 @@ impl MultipartManager {
                     }
                 })
                 .collect(),
+            size: self.pushed_size,
         }
     }
 }
@@ -1141,7 +1260,6 @@ pub trait BatchBufferingWriter: Send {
     fn get_trailing_bytes_for_checkpoint(&mut self) -> Option<Vec<u8>>;
     fn close(&mut self, final_batch: Option<Self::BatchData>) -> Option<Vec<u8>>;
 }
-
 pub struct BatchMultipartWriter<
     BB: BatchBuilder,
     BBW: BatchBufferingWriter<BatchData = BB::BatchData>,
@@ -1333,6 +1451,7 @@ impl Debug for MultipartCallback {
 pub struct FileSystemDataRecovery<T: Data> {
     next_file_index: usize,
     active_files: Vec<InProgressFileCheckpoint<T>>,
+    delta_version: i64,
 }
 
 #[async_trait]
@@ -1347,6 +1466,10 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
         "filesystem_sink".to_string()
     }
 
+    fn commit_strategy(&self) -> CommitStrategy {
+        self.commit_strategy
+    }
+
     async fn init(
         &mut self,
         task_info: &TaskInfo,
@@ -1401,7 +1524,10 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
         // loop over checkpoint receiver until finished received
         while let Some(checkpoint_message) = self.checkpoint_receiver.recv().await {
             match checkpoint_message {
-                CheckpointData::Finished { max_file_index: _ } => return Ok(()),
+                CheckpointData::Finished {
+                    max_file_index: _,
+                    delta_version: _,
+                } => return Ok(()),
                 _ => {
                     bail!("unexpected checkpoint message")
                 }
@@ -1427,11 +1553,15 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
         let mut active_files = Vec::new();
         while let Some(checkpoint_message) = self.checkpoint_receiver.recv().await {
             match checkpoint_message {
-                CheckpointData::Finished { max_file_index } => {
+                CheckpointData::Finished {
+                    max_file_index,
+                    delta_version,
+                } => {
                     return Ok((
                         FileSystemDataRecovery {
                             next_file_index: max_file_index + 1,
                             active_files,
+                            delta_version,
                         },
                         pre_commit_messages,
                     ))
@@ -1442,6 +1572,7 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
                     data,
                     buffered_data,
                     representative_timestamp,
+                    pushed_size,
                 }) => {
                     if let FileCheckpointData::MultiPartWriterUploadCompleted {
                         multi_part_upload_id,
@@ -1455,6 +1586,7 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
                                 partition,
                                 multi_part_upload_id,
                                 completed_parts,
+                                size: pushed_size,
                             },
                         );
                     } else {
@@ -1464,6 +1596,7 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
                             data,
                             buffered_data,
                             representative_timestamp,
+                            pushed_size,
                         })
                     }
                 }
diff --git a/arroyo-worker/src/connectors/impulse.rs b/arroyo-worker/src/connectors/impulse.rs
index 4b94c6a9a..7ab04b650 100644
--- a/arroyo-worker/src/connectors/impulse.rs
+++ b/arroyo-worker/src/connectors/impulse.rs
@@ -167,7 +167,7 @@ impl<K: Data, T: Data> ImpulseSourceFunc<K, T> {
                         }
                     }
                 }
-                Ok(ControlMessage::Commit { epoch: _ }) => {
+                Ok(ControlMessage::Commit { .. }) => {
                     unreachable!("sources shouldn't receive commit messages");
                 }
                 Ok(ControlMessage::LoadCompacted { compacted }) => {
diff --git a/arroyo-worker/src/connectors/kafka/sink/mod.rs b/arroyo-worker/src/connectors/kafka/sink/mod.rs
index 7d181663f..55f73492e 100644
--- a/arroyo-worker/src/connectors/kafka/sink/mod.rs
+++ b/arroyo-worker/src/connectors/kafka/sink/mod.rs
@@ -251,7 +251,12 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {
         }
     }
 
-    async fn handle_commit(&mut self, epoch: u32, ctx: &mut crate::engine::Context<(), ()>) {
+    async fn handle_commit(
+        &mut self,
+        epoch: u32,
+        _commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
+        ctx: &mut crate::engine::Context<(), ()>,
+    ) {
         let ConsistencyMode::ExactlyOnce {
             next_transaction_index: _,
             producer_to_complete,
@@ -295,8 +300,8 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {
         if !self.is_committing() {
             return;
         }
-        if let Some(ControlMessage::Commit { epoch }) = ctx.control_rx.recv().await {
-            self.handle_commit(epoch, ctx).await;
+        if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await {
+            self.handle_commit(epoch, commit_data, ctx).await;
         } else {
             warn!("no commit message received, not committing")
         }
diff --git a/arroyo-worker/src/connectors/kafka/source/mod.rs b/arroyo-worker/src/connectors/kafka/source/mod.rs
index 97481753a..57707323a 100644
--- a/arroyo-worker/src/connectors/kafka/source/mod.rs
+++ b/arroyo-worker/src/connectors/kafka/source/mod.rs
@@ -303,7 +303,7 @@ where
                                 }
                             }
                         }
-                        Some(ControlMessage::Commit { epoch: _ }) => {
+                        Some(ControlMessage::Commit { .. }) => {
                             unreachable!("sources shouldn't receive commit messages");
                         }
                         Some(ControlMessage::LoadCompacted {compacted}) => {
diff --git a/arroyo-worker/src/connectors/kafka/source/test.rs b/arroyo-worker/src/connectors/kafka/source/test.rs
index af014efed..0e833cb2f 100644
--- a/arroyo-worker/src/connectors/kafka/source/test.rs
+++ b/arroyo-worker/src/connectors/kafka/source/test.rs
@@ -252,6 +252,7 @@ async fn test_kafka() {
         tables: source::tables(),
         backend_data: checkpoint_completed.subtask_metadata.backend_data,
         bytes: checkpoint_completed.subtask_metadata.bytes,
+        commit_data: None,
     })
     .await;
 
diff --git a/arroyo-worker/src/connectors/kinesis/source/mod.rs b/arroyo-worker/src/connectors/kinesis/source/mod.rs
index f6af5118b..683267003 100644
--- a/arroyo-worker/src/connectors/kinesis/source/mod.rs
+++ b/arroyo-worker/src/connectors/kinesis/source/mod.rs
@@ -450,7 +450,7 @@ impl<K: Data, T: SchemaData> KinesisSourceFunc<K, T> {
                                 }
                             }
                         }
-                        Some(ControlMessage::Commit { epoch: _ }) => {
+                        Some(ControlMessage::Commit { .. }) => {
                             unreachable!("sources shouldn't receive commit messages");
                         }
                         Some(ControlMessage::LoadCompacted { compacted }) => {
@@ -458,7 +458,6 @@ impl<K: Data, T: SchemaData> KinesisSourceFunc<K, T> {
                         },
                         Some(ControlMessage::NoOp ) => {}
                         None => {
-
                         }
                     }
                 }
diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs
index 591f883b3..e8b01ab24 100644
--- a/arroyo-worker/src/connectors/polling_http.rs
+++ b/arroyo-worker/src/connectors/polling_http.rs
@@ -157,7 +157,7 @@ where
                     }
                 }
             }
-            ControlMessage::Commit { epoch: _ } => {
+            ControlMessage::Commit { .. } => {
                 unreachable!("sources shouldn't receive commit messages");
             }
             ControlMessage::LoadCompacted { compacted } => {
diff --git a/arroyo-worker/src/connectors/sse.rs b/arroyo-worker/src/connectors/sse.rs
index 8099ac29e..445e9f729 100644
--- a/arroyo-worker/src/connectors/sse.rs
+++ b/arroyo-worker/src/connectors/sse.rs
@@ -136,7 +136,7 @@ where
                     }
                 }
             }
-            ControlMessage::Commit { epoch: _ } => {
+            ControlMessage::Commit { .. } => {
                 unreachable!("sources shouldn't receive commit messages");
             }
             ControlMessage::LoadCompacted { compacted } => {
diff --git a/arroyo-worker/src/connectors/two_phase_committer.rs b/arroyo-worker/src/connectors/two_phase_committer.rs
index 4e5e1862c..5184196fb 100644
--- a/arroyo-worker/src/connectors/two_phase_committer.rs
+++ b/arroyo-worker/src/connectors/two_phase_committer.rs
@@ -10,6 +10,7 @@ use arroyo_rpc::{
 use arroyo_state::tables::global_keyed_map::GlobalKeyedState;
 use arroyo_types::{Data, Key, Record, TaskInfo, Watermark};
 use async_trait::async_trait;
+use bincode::config;
 use tracing::warn;
 
 #[derive(StreamNode)]
@@ -59,6 +60,17 @@ pub trait TwoPhaseCommitter<K: Key, T: Data + Sync>: Send + 'static {
         watermark: Option<SystemTime>,
         stopping: bool,
     ) -> Result<(Self::DataRecovery, HashMap<String, Self::PreCommit>)>;
+    fn commit_strategy(&self) -> CommitStrategy {
+        CommitStrategy::PerSubtask
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub enum CommitStrategy {
+    // Per subtask uses the subtask itself as the committer, writing the pre-commit messages to state for restoration.
+    PerSubtask,
+    // Per operator uses subtask 0 as the committer, passing all PreCommit data through the control system/checkpoint metadata.
+    PerOperator,
 }
 
 #[process_fn(in_k = K, in_t = T)]
@@ -129,13 +141,21 @@ impl<K: Key, T: Data + Sync, TPC: TwoPhaseCommitter<K, T>> TwoPhaseCommitterOper
     }
 
     async fn on_close(&mut self, ctx: &mut crate::engine::Context<(), ()>) {
-        if let Some(ControlMessage::Commit { epoch }) = ctx.control_rx.recv().await {
-            self.handle_commit(epoch, ctx).await;
+        if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await {
+            self.handle_commit(epoch, commit_data, ctx).await;
         } else {
             warn!("no commit message received, not committing")
         }
     }
 
+    fn map_from_serialized_data(serialized_data: Vec<u8>) -> Vec<TPC::PreCommit> {
+        let map: HashMap<String, TPC::PreCommit> =
+            bincode::decode_from_slice(&serialized_data, config::standard())
+                .unwrap()
+                .0;
+        map.into_values().collect()
+    }
+
     async fn handle_checkpoint(
         &mut self,
         checkpoint_barrier: &arroyo_types::CheckpointBarrier,
@@ -155,22 +175,58 @@ impl<K: Key, T: Data + Sync, TPC: TwoPhaseCommitter<K, T>> TwoPhaseCommitterOper
             )
             .await
             .unwrap();
+
         let mut recovery_data_state: GlobalKeyedState<usize, _, _> =
             ctx.state.get_global_keyed_state('r').await;
         recovery_data_state
             .insert(ctx.task_info.task_index, recovery_data)
             .await;
-        let mut pre_commit_state: GlobalKeyedState<String, _, _> =
-            ctx.state.get_global_keyed_state('p').await;
         self.pre_commits.clear();
-        for (key, value) in pre_commits {
-            self.pre_commits.push(value.clone());
-            pre_commit_state.insert(key, value).await;
+        if pre_commits.is_empty() {
+            return;
+        }
+        let commit_strategy = self.committer.commit_strategy();
+        match commit_strategy {
+            CommitStrategy::PerSubtask => {
+                let mut pre_commit_state: GlobalKeyedState<String, _, _> =
+                    ctx.state.get_global_keyed_state('p').await;
+                for (key, value) in pre_commits {
+                    self.pre_commits.push(value.clone());
+                    pre_commit_state.insert(key, value).await;
+                }
+            }
+            CommitStrategy::PerOperator => {
+                let serialized_pre_commits =
+                    bincode::encode_to_vec(&pre_commits, config::standard()).unwrap();
+                ctx.state
+                    .insert_committing_data('p', serialized_pre_commits)
+                    .await;
+            }
         }
     }
-    async fn handle_commit(&mut self, epoch: u32, ctx: &mut crate::engine::Context<(), ()>) {
-        let pre_commits = self.pre_commits.clone();
-        self.pre_commits.clear();
+    async fn handle_commit(
+        &mut self,
+        epoch: u32,
+        mut commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
+        ctx: &mut crate::engine::Context<(), ()>,
+    ) {
+        let pre_commits = match self.committer.commit_strategy() {
+            CommitStrategy::PerSubtask => std::mem::take(&mut self.pre_commits),
+            CommitStrategy::PerOperator => {
+                // only subtask 0 should be committing
+                if ctx.task_info.task_index == 0 {
+                    commit_data
+                        .remove(&'p')
+                        .unwrap_or_default()
+                        .into_values()
+                        .flat_map(|serialized_data| Self::map_from_serialized_data(serialized_data))
+                        .collect()
+                } else {
+                    vec![]
+                }
+            }
+        };
+
         self.committer
             .commit(&ctx.task_info, pre_commits)
             .await
diff --git a/arroyo-worker/src/connectors/websocket.rs b/arroyo-worker/src/connectors/websocket.rs
index 02e30a72c..b3edcd229 100644
--- a/arroyo-worker/src/connectors/websocket.rs
+++ b/arroyo-worker/src/connectors/websocket.rs
@@ -133,7 +133,7 @@ where
                     }
                 }
             }
-            ControlMessage::Commit { epoch: _ } => {
+            ControlMessage::Commit { .. } => {
                 unreachable!("sources shouldn't receive commit messages");
             }
             ControlMessage::LoadCompacted { compacted } => {
diff --git a/arroyo-worker/src/formats.rs b/arroyo-worker/src/formats.rs
index 95ffdef7f..7dbd57fe0 100644
--- a/arroyo-worker/src/formats.rs
+++ b/arroyo-worker/src/formats.rs
@@ -327,7 +327,7 @@ pub mod timestamp_as_rfc3339 {
         D: Deserializer<'de>,
     {
         let raw: chrono::DateTime<Utc> = DateTime::deserialize(deserializer)?;
-        Ok(from_nanos(raw.timestamp_nanos() as u128))
+        Ok(from_nanos(raw.timestamp_nanos_opt().unwrap() as u128))
     }
 }
 
@@ -355,7 +355,7 @@ pub mod opt_timestamp_as_rfc3339 {
         D: Deserializer<'de>,
     {
         let raw = Option::<DateTime<Utc>>::deserialize(deserializer)?;
-        Ok(raw.map(|raw| from_nanos(raw.timestamp_nanos() as u128)))
+        Ok(raw.map(|raw| from_nanos(raw.timestamp_nanos_opt().unwrap() as u128)))
     }
 }
 
diff --git a/arroyo-worker/src/lib.rs b/arroyo-worker/src/lib.rs
index e373e7405..d2f938273 100644
--- a/arroyo-worker/src/lib.rs
+++ b/arroyo-worker/src/lib.rs
@@ -9,11 +9,11 @@ use arrow::datatypes::{DataType, Field, Schema};
 use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient;
 use arroyo_rpc::grpc::worker_grpc_server::{WorkerGrpc, WorkerGrpcServer};
 use arroyo_rpc::grpc::{
-    CheckpointReq, CheckpointResp, HeartbeatReq, JobFinishedReq, JobFinishedResp,
-    LoadCompactedDataReq, LoadCompactedDataRes, RegisterWorkerReq, StartExecutionReq,
-    StartExecutionResp, StopExecutionReq, StopExecutionResp, TaskCheckpointCompletedReq,
-    TaskCheckpointEventReq, TaskFailedReq, TaskFinishedReq, TaskStartedReq, WorkerErrorReq,
-    WorkerResources,
+    CheckpointReq, CheckpointResp, CommitReq, CommitResp, HeartbeatReq, JobFinishedReq,
+    JobFinishedResp, LoadCompactedDataReq, LoadCompactedDataRes, RegisterWorkerReq,
+    StartExecutionReq, StartExecutionResp, StopExecutionReq, StopExecutionResp,
+    TaskCheckpointCompletedReq, TaskCheckpointEventReq, TaskFailedReq, TaskFinishedReq,
+    TaskStartedReq, WorkerErrorReq, WorkerResources,
 };
 use arroyo_server_common::start_admin_server;
 use arroyo_types::{
@@ -566,7 +566,10 @@ impl WorkerGrpc for WorkerServer {
             };
             for sender in &senders {
                 sender
-                    .send(ControlMessage::Commit { epoch: req.epoch })
+                    .send(ControlMessage::Commit {
+                        epoch: req.epoch,
+                        commit_data: HashMap::new(),
+                    })
                     .await
                     .unwrap();
             }
@@ -599,6 +602,49 @@ impl WorkerGrpc for WorkerServer {
         Ok(Response::new(CheckpointResp {}))
     }
 
+    async fn commit(&self, request: Request<CommitReq>) -> Result<Response<CommitResp>, Status> {
+        let req = request.into_inner();
+        let sender_commit_map_pairs = {
+            let state_mutex = self.state.lock().unwrap();
+            let Some(state) = state_mutex.as_ref() else {
+                return Err(Status::failed_precondition(
+                    "Worker has not yet started execution",
+                ));
+            };
+            info!("committing func");
+            let mut sender_commit_map_pairs = vec![];
+            info!("committing data is {:?}", req.committing_data);
+            for (operator_id, commit_operator) in req.committing_data {
+                let nodes = state.operator_controls.get(&operator_id).unwrap().clone();
+                let commit_map: HashMap<_, _> = commit_operator
+                    .committing_data
+                    .into_iter()
+                    .map(|(table, backend_data)| {
+                        (
+                            table.chars().next().unwrap(),
+                            backend_data.commit_data_by_subtask,
+                        )
+                    })
+                    .collect();
+                sender_commit_map_pairs.push((nodes, commit_map));
+            }
+            sender_commit_map_pairs
+        };
+        for (senders, commit_map) in sender_commit_map_pairs {
+            for sender in senders {
+                sender
+                    .send(ControlMessage::Commit {
+                        epoch: req.epoch,
+                        commit_data: commit_map.clone(),
+                    })
+                    .await
+                    .unwrap();
+            }
+        }
+        info!("finished sending commits");
+        Ok(Response::new(CommitResp {}))
+    }
+
     async fn load_compacted_data(
         &self,
         request: Request<LoadCompactedDataReq>,
diff --git a/connector-schemas/filesystem/table.json b/connector-schemas/filesystem/table.json
index 1405b1d6c..65e9fa53f 100644
--- a/connector-schemas/filesystem/table.json
+++ b/connector-schemas/filesystem/table.json
@@ -109,6 +109,14 @@
                         }
                     },
                     "additionalProperties": false
+                },
+                "commit_style": {
+                    "title": "Commit Style",
+                    "type": "string",
+                    "enum": [
+                        "direct",
+                        "delta_lake"
+                    ]
                 }
             },
             "additionalProperties": false