From 677a22266f76f94ec45941bea426760f843ef01f Mon Sep 17 00:00:00 2001 From: erp12 Date: Thu, 22 Apr 2021 01:56:10 -0400 Subject: [PATCH 1/5] Changes deprecated collection conversions and adds class-exists? --- src/clojure/zero_one/geni/interop.clj | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/clojure/zero_one/geni/interop.clj b/src/clojure/zero_one/geni/interop.clj index f1aa4d80..2645bc96 100644 --- a/src/clojure/zero_one/geni/interop.clj +++ b/src/clojure/zero_one/geni/interop.clj @@ -4,6 +4,7 @@ [clojure.java.data :as j] [clojure.string :refer [replace-first]] [clojure.walk :as walk] + [clojure.reflect :refer [resolve-class]] [zero-one.geni.docs :as docs] [zero-one.geni.utils :refer [ensure-coll]]) (:import @@ -20,8 +21,9 @@ Function3 Tuple2 Tuple3) - (scala.collection JavaConversions Map Seq) - (scala.collection.convert Wrappers$IterableWrapper))) + (scala.collection Map Seq) + (scala.collection.convert Wrappers$IterableWrapper) + (scala.collection JavaConverters))) (declare ->clojure) @@ -44,15 +46,15 @@ (instance? Tuple3 value)) (defn scala-seq->vec [scala-seq] - (vec (JavaConversions/seqAsJavaList scala-seq))) + (-> scala-seq (JavaConverters/seqAsJavaList) vec)) (defn scala-map->map [^Map m] (into {} - (for [[k v] (JavaConversions/mapAsJavaMap m)] + (for [[k v] (JavaConverters/mapAsJavaMap m)] [k (->clojure v)]))) (defn ->scala-seq [coll] - (JavaConversions/asScalaBuffer (seq coll))) + (JavaConverters/asScalaBuffer (seq coll))) (defn ->scala-tuple2 [coll] (Tuple2. (first coll) (second coll))) @@ -75,6 +77,13 @@ (defn ->scala-function3 [f] (reify Function3 (apply [_ x y z] (f x y z)))) +(defn ->scala-map + [m] + (JavaConverters/mapAsScalaMap m)) + +(defn class-exists? [c] + (resolve-class (.getContextClassLoader (Thread/currentThread)) c)) + (defn optional->nillable [value] (when (.isPresent value) (.get value))) From 4b2939bb8a30dc2acc3024ad9b84b96c0e3fefb0 Mon Sep 17 00:00:00 2001 From: erp12 Date: Thu, 22 Apr 2021 01:57:26 -0400 Subject: [PATCH 2/5] Adds function for getting the schema of a dataset --- src/clojure/zero_one/geni/core.clj | 1 + src/clojure/zero_one/geni/core/dataset.clj | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/clojure/zero_one/geni/core.clj b/src/clojure/zero_one/geni/core.clj index 4d98e28f..645a44f0 100644 --- a/src/clojure/zero_one/geni/core.clj +++ b/src/clojure/zero_one/geni/core.clj @@ -480,6 +480,7 @@ rollup sample sample-by + schema select select-expr show diff --git a/src/clojure/zero_one/geni/core/dataset.clj b/src/clojure/zero_one/geni/core/dataset.clj index 78997469..aa552fc4 100644 --- a/src/clojure/zero_one/geni/core/dataset.clj +++ b/src/clojure/zero_one/geni/core/dataset.clj @@ -13,7 +13,8 @@ [zero-one.geni.interop :as interop] [zero-one.geni.utils :refer [ensure-coll]]) (:import - (org.apache.spark.sql Column))) + (org.apache.spark.sql Column Dataset) + (org.apache.spark.sql.types StructType))) ;;;; Actions (defn- collected->maps [collected] @@ -38,6 +39,10 @@ (defn take [dataframe n-rows] (-> dataframe (.take n-rows) collected->maps)) +(defn schema ^StructType + [^Dataset dataframe] + (.schema dataframe)) + (defn show ([dataframe] (show dataframe {})) ([dataframe options] From fef9ca8eda8e71c88d90c89e5947e125ec6c592f Mon Sep 17 00:00:00 2001 From: erp12 Date: Thu, 22 Apr 2021 01:58:19 -0400 Subject: [PATCH 3/5] Adds function for updating spark session settings --- src/clojure/zero_one/geni/core.clj | 2 ++ src/clojure/zero_one/geni/spark.clj | 26 ++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/clojure/zero_one/geni/core.clj b/src/clojure/zero_one/geni/core.clj index 645a44f0..bacf5757 100644 --- a/src/clojure/zero_one/geni/core.clj +++ b/src/clojure/zero_one/geni/core.clj @@ -97,6 +97,8 @@ [zero-one.geni.spark create-spark-session spark-conf + set-settings + with-settings sql]) (import-vars diff --git a/src/clojure/zero_one/geni/spark.clj b/src/clojure/zero_one/geni/spark.clj index 9e8e48f6..63179e4d 100644 --- a/src/clojure/zero_one/geni/spark.clj +++ b/src/clojure/zero_one/geni/spark.clj @@ -43,6 +43,32 @@ .getConf interop/spark-conf->map)) +(defn set-settings + "Set the given spark session settings. Return the spark-session. + + The spark-session configuration is mutable, and thus these settings are applied in-place. + " + [spark-session settings] + (reduce-kv (fn [sess k v] (-> sess .conf (.set (name k) v))) + spark-session + settings)) + +(defn with-settings + "Temporarily sets the given spark session `settings`, runs the function `f`, + and then reverts the spark session settings to their original values. + + `The function `f` should take 1 argument, the spark session with the temporary + settings set. + + This is an unsafe operation when sharing a single spark session across multiple + threads." + [spark-session settings f] + (let [current-settings (select-keys (spark-conf spark-session) (keys settings)) + session-with-settings (set-settings spark-session settings) + result (f session-with-settings)] + (set-settings session-with-settings current-settings) + result)) + (defn sql "Executes a SQL query using Spark, returning the result as a `DataFrame`. From 04e7834c6d1f91aae170217a6920da15a9fe218f Mon Sep 17 00:00:00 2001 From: erp12 Date: Thu, 22 Apr 2021 01:59:34 -0400 Subject: [PATCH 4/5] Refactors test utility fo joining paths --- test/zero_one/geni/test_resources.clj | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/zero_one/geni/test_resources.clj b/test/zero_one/geni/test_resources.clj index 9f6d17c8..05dbad59 100644 --- a/test/zero_one/geni/test_resources.clj +++ b/test/zero_one/geni/test_resources.clj @@ -41,6 +41,10 @@ :timestamp (long (Integer/parseInt (nth row 3)))})) (g/records->dataset @spark))) +(defn join-paths ^String + [root & path-parts] + (.toString (Paths/get root (into-array String path-parts)))) + (def -tmp-dir-attr (into-array FileAttribute '())) @@ -66,11 +70,11 @@ (recursive-delete-dir wh-dir)))) (def test-warehouses-root - (str (Paths/get (.getAbsolutePath (io/file "")) (into-array String ["spark-warehouses"])))) + (join-paths (System/getProperty "user.dir") "spark-warehouses")) (defn rand-wh-path [] - (str "file:" (Paths/get test-warehouses-root (into-array String [(str (UUID/randomUUID))])))) + (str "file:" (join-paths test-warehouses-root (str (UUID/randomUUID))))) (defn reset-session! [] From 5921582b0a96f4e6a2b269236a216ba16210208e Mon Sep 17 00:00:00 2001 From: erp12 Date: Thu, 22 Apr 2021 02:00:38 -0400 Subject: [PATCH 5/5] Adds support for Delta Lake --- docker/project.clj | 1 + project.clj | 1 + src/clojure/zero_one/geni/core.clj | 2 + .../zero_one/geni/core/data_sources.clj | 28 +++ src/clojure/zero_one/geni/defaults.clj | 11 +- src/clojure/zero_one/geni/delta.clj | 205 ++++++++++++++++++ test/zero_one/geni/data_sources_test.clj | 11 +- test/zero_one/geni/delta_test.clj | 189 ++++++++++++++++ 8 files changed, 444 insertions(+), 4 deletions(-) create mode 100644 src/clojure/zero_one/geni/delta.clj create mode 100644 test/zero_one/geni/delta_test.clj diff --git a/docker/project.clj b/docker/project.clj index 9b0d3bef..6f1a12e2 100644 --- a/docker/project.clj +++ b/docker/project.clj @@ -9,6 +9,7 @@ [org.apache.spark/spark-mllib_2.12 "3.1.1"] [org.apache.spark/spark-sql_2.12 "3.1.1"] [org.apache.spark/spark-streaming_2.12 "3.1.1"] + [io.delta/delta-core_2.12 "0.8.0"] ; Arrow [org.apache.arrow/arrow-memory-netty "3.0.0"] [org.apache.arrow/arrow-memory-core "3.0.0"] diff --git a/project.clj b/project.clj index 9b0d3bef..6f1a12e2 100644 --- a/project.clj +++ b/project.clj @@ -9,6 +9,7 @@ [org.apache.spark/spark-mllib_2.12 "3.1.1"] [org.apache.spark/spark-sql_2.12 "3.1.1"] [org.apache.spark/spark-streaming_2.12 "3.1.1"] + [io.delta/delta-core_2.12 "0.8.0"] ; Arrow [org.apache.arrow/arrow-memory-netty "3.0.0"] [org.apache.arrow/arrow-memory-core "3.0.0"] diff --git a/src/clojure/zero_one/geni/core.clj b/src/clojure/zero_one/geni/core.clj index bacf5757..5845101b 100644 --- a/src/clojure/zero_one/geni/core.clj +++ b/src/clojure/zero_one/geni/core.clj @@ -525,6 +525,7 @@ read-avro! read-binary! read-csv! + read-delta! read-edn! read-jdbc! read-json! @@ -535,6 +536,7 @@ read-xlsx! write-avro! write-csv! + write-delta! write-edn! write-jdbc! write-json! diff --git a/src/clojure/zero_one/geni/core/data_sources.clj b/src/clojure/zero_one/geni/core/data_sources.clj index cb0c572d..ac24bdaa 100644 --- a/src/clojure/zero_one/geni/core/data_sources.clj +++ b/src/clojure/zero_one/geni/core/data_sources.clj @@ -384,3 +384,31 @@ view, e.g. `SELECT * FROM global_temp.view1`." [^Dataset dataframe ^String view-name] (.createOrReplaceGlobalTempView dataframe view-name)) + + +;; Delta +(defmulti read-delta! + "Loads a delta table from a directory and returns the results as a DataFrame. + + Spark's DataFrameReader options may be passed in as a map of options. + + See: https://spark.apache.org/docs/latest/sql-data-sources.html + See: https://docs.delta.io/latest/quick-start.html#read-data" + (fn [head & _] (class head))) +(defmethod read-delta! :default + ([path] (read-delta! @defaults/spark path)) + ([path options] (read-delta! @defaults/spark path options))) +(defmethod read-delta! SparkSession + ([spark path] (read-delta! spark path {})) + ([spark path options] (read-data! "delta" spark path options))) + + +(defn write-delta! + "Writes a delta table at the specified path. + + Spark's DataFrameWriter options may be passed in as a map of options. + + See: https://spark.apache.org/docs/latest/sql-data-sources.html + See: https://docs.delta.io/latest/quick-start.html#create-a-table" + ([dataframe path] (write-delta! dataframe path {})) + ([dataframe path options] (write-data! "delta" dataframe path options))) diff --git a/src/clojure/zero_one/geni/defaults.clj b/src/clojure/zero_one/geni/defaults.clj index 9d9bcfa8..2d70b094 100644 --- a/src/clojure/zero_one/geni/defaults.clj +++ b/src/clojure/zero_one/geni/defaults.clj @@ -1,10 +1,15 @@ (ns zero-one.geni.defaults (:require - [zero-one.geni.spark])) + [zero-one.geni.spark] + [zero-one.geni.interop :as iop])) (def session-config - {:configs {:spark.sql.adaptive.enabled "true" - :spark.sql.adaptive.coalescePartitions.enabled "true"} + {:configs (merge {:spark.sql.adaptive.enabled "true" + :spark.sql.adaptive.coalescePartitions.enabled "true"} + (when (iop/class-exists? 'io.delta.sql.DeltaSparkSessionExtension) + ;; Required for writing Delta tables. + {:spark.sql.extensions "io.delta.sql.DeltaSparkSessionExtension" + :spark.sql.catalog.spark_catalog "org.apache.spark.sql.delta.catalog.DeltaCatalog"})) :checkpoint-dir "target/checkpoint/"}) (def spark diff --git a/src/clojure/zero_one/geni/delta.clj b/src/clojure/zero_one/geni/delta.clj new file mode 100644 index 00000000..e7975d5b --- /dev/null +++ b/src/clojure/zero_one/geni/delta.clj @@ -0,0 +1,205 @@ +(ns zero-one.geni.delta + (:refer-clojure :exclude [update merge]) + (:require [clojure.spec.alpha :as s] + [zero-one.geni.core :as g] + [zero-one.geni.defaults :as defaults] + [zero-one.geni.interop :as i] + [zero-one.geni.utils :refer [with-dynamic-import]]) + (:import (org.apache.spark.sql Column Dataset SparkSession) + (java.util Map))) + +(s/def ::format #{:path :table}) +(s/def ::column-name (s/or :kw keyword? :str string?)) +(s/def ::column #(instance? Column %)) +(s/def ::condition ::column) +(s/def ::on-match #{:delete :update}) +(s/def ::on-not-match #{:insert}) + +(s/def ::column-rule + (s/or :some (s/map-of ::column-name ::column) + :all #{:all})) + +(s/def ::when-matched-clause + (s/keys :req [::on-match ::column-rule] + :opt [::condition])) + +(s/def ::when-matched + (s/coll-of ::when-matched-clause :kind sequential? :max-count 2)) + +(s/def ::when-not-matched + (s/keys :req [::on-not-match ::column-rule] + :opt [::condition])) + +(s/def ::merge-strategy + (s/keys :req [::condition] + :opt [::when-matched ::when-not-matched])) + +;; @todo Uncomment this code once Delta has been upgraded to support Spark 3.1.x +;; This code has been tested against Spark 3.0.2 +;(defn validate-merge-strategy +; " +; See constraints: https://docs.delta.io/latest/api/scala/io/delta/tables/DeltaMergeBuilder.html" +; [merge-strategy] +; {:pre [(s/valid? ::merge-strategy merge-strategy)]} +; (let [when-matched (get merge-strategy ::when-matched [])] +; ; There can be at most one `update` action and one `delete` action in when-matched clauses. +; (let [clause-counts (->> when-matched +; (group-by ::on-match) +; (map (fn [[k v]] [k (count v)])))] +; (when (> (count clause-counts) 2) +; (ex-info (str "Found more than 2 `when-matched` clauses in a Delta merge strategy. There can only be 1 update and 1 delete.") +; {:clause-kinds (keys clause-counts) +; ::merge-strategy merge-strategy})) +; (doseq [[clause-kind clause-count] clause-counts] +; (when (> clause-count 1) +; (ex-info (str "Found multiple " clause-kind " clauses in a Delta merge strategy.") +; {:clause-kind clause-kind +; :clause-count clause-count +; ::merge-strategy merge-strategy})))) +; +; ; If there are two when-matched clauses, then the first one must have a condition. +; (when (and (= (count when-matched) 2) +; (= (::column-rule (first when-matched)) :all)) +; (ex-info "When using two when-matched clauses in a Delta merge the first one must have a condition, not :all." +; {::merge-strategy merge-strategy}))) +; true) +; +;(defn- prepare-update-set-map ^Map +; [m] +; (->> m +; (map (fn [[k v]] [(name k) (g/col v)])) +; (into {}))) + +(with-dynamic-import + [[io.delta.tables DeltaTable DeltaMergeBuilder]] + + (defn delta-table ^DeltaTable + ([opts] (delta-table @defaults/spark opts)) + ([^SparkSession spark {:keys [format location] :or {format :path}}] + {:pre [(s/valid? ::format format)]} + (if (= format :path) + (. DeltaTable forPath spark location) + (. DeltaTable forName spark location)))) + + (defn delta-table? + ([^String identifier] + (DeltaTable/isDeltaTable identifier)) + ([^SparkSession spark ^String identifier] + (DeltaTable/isDeltaTable spark identifier))) + + (defn as ^DeltaTable + [^DeltaTable table ^String alias] + ;; @todo move to polymorphic.clj? How to handle dynamic import in multimethod? + (.as table alias)) + + (defn to-df ^Dataset + [^DeltaTable table] + ;; @todo move to polymorphic.clj? How to handle dynamic import in multimethod? + (.toDF table)) + + (defmulti convert-to-delta (fn [head & _] (class head))) + (defmethod convert-to-delta :default + [^String identifier & {:keys [partition-schema]}] + (convert-to-delta @defaults/spark identifier :partition-schema partition-schema)) + (defmethod convert-to-delta SparkSession + [^SparkSession spark ^String identifier & {:keys [partition-schema]}] + (if (nil? partition-schema) + (DeltaTable/convertToDelta spark identifier) + (DeltaTable/convertToDelta spark identifier (g/->schema partition-schema)))) + + (defmulti delete (fn [& args] (class (last args)))) + (defmethod delete :default + [^DeltaTable table] + (.delete table)) + (defmethod delete Column + [^DeltaTable table ^Column condition] + (.delete table condition)) + + (defn history ^Dataset + ([^DeltaTable table] + (.history table)) + ([^DeltaTable table ^Integer limit] + (.history table limit))) + + (defn vacuum + ([^DeltaTable table] + (.vacuum table)) + ([^DeltaTable table ^Double retention-hours] + (.vacuum table retention-hours))) + + ;; @todo Uncomment this code once Delta has been upgraded to support Spark 3.1.x + ;; This code has been tested against Spark 3.0.2 + ;(defn update + ; ;; @todo Update is broken with Spark 3.1 + Delta 0.8. upgrade Delta ASAP. + ; ;; https://github.com/delta-io/delta/issues/594 + ; ([^DeltaTable table set-to] + ; (let [m (prepare-update-set-map set-to)] + ; (.update table m))) + ; ([^DeltaTable table ^Column condition set-to] + ; (.update table condition (prepare-update-set-map set-to)))) + ; + ;(defn- apply-when-matched-clause ^DeltaMergeBuilder + ; [^DeltaMergeBuilder merge-builder when-matched-clause] + ; (let [{:zero-one.geni.delta/keys [condition on-match column-rule]} when-matched-clause + ; on-match-builder (if (nil? condition) + ; (.whenMatched merge-builder) + ; (.whenMatched merge-builder condition))] + ; (cond + ; (and (= on-match :update) (= column-rule :all)) + ; (.updateAll on-match-builder) + ; + ; (= on-match :update) + ; (let [scala-column-rule (->> column-rule + ; (map (fn [[k column]] [(name k) column])) + ; (into {}) + ; (i/->scala-map))] + ; (.update on-match-builder scala-column-rule)) + ; + ; (and (= on-match :delete) (= column-rule :all)) + ; (.delete on-match-builder) + ; + ; (= on-match :delete) + ; (ex-info "If a Delta merge `when-matched` clause is set to `delete`, it must use the column-rule `all`." + ; {::when-matched-clause when-matched-clause}) + ; + ; :else + ; (ex-info (str "Unknown `on-match` for Delta merge strategy.") + ; {::when-matched-clause when-matched-clause})))) + ; + ;(defn- apply-when-not-matched ^DeltaMergeBuilder + ; [^DeltaMergeBuilder merge-builder when-not-matched] + ; (let [{:zero-one.geni.delta/keys [on-not-match column-rule condition]} when-not-matched + ; not-match-builder (if (nil? condition) + ; (.whenNotMatched merge-builder) + ; (.whenNotMatched merge-builder condition))] + ; (cond + ; (and (= on-not-match :insert) (= column-rule :all)) + ; (.insertAll not-match-builder) + ; + ; (= on-not-match :insert) + ; (let [scala-column-rule (->> column-rule + ; (map (fn [[k column]] [(name k) column])) + ; (into {}) + ; (i/->scala-map))] + ; (.insert not-match-builder scala-column-rule)) + ; + ; :else + ; (ex-info (str "Unknown `on-not-match` for Delta merge strategy.") + ; {::when-not-matched when-not-matched})))) + ; + ;(defn merge + ; [^DeltaTable destination ^Dataset source merge-strategy] + ; {:pre [(validate-merge-strategy merge-strategy)]} + ; ;; @todo Update is broken with Spark 3.1 + Delta 0.8. upgrade Delta ASAP. + ; ;; https://github.com/delta-io/delta/issues/594 + ; (let [merge-builder (.merge destination source (::condition merge-strategy)) + ; with-on-matched (reduce (fn [builder clause] + ; (apply-when-matched-clause builder clause)) + ; merge-builder + ; (get merge-strategy ::when-matched [])) + ; with-not-matched (let [clause (::when-not-matched merge-strategy)] + ; (if (nil? clause) + ; with-on-matched + ; (apply-when-not-matched with-on-matched clause)))] + ; (.execute with-not-matched))) + ) diff --git a/test/zero_one/geni/data_sources_test.clj b/test/zero_one/geni/data_sources_test.clj index 837e7e03..3d245efc 100644 --- a/test/zero_one/geni/data_sources_test.clj +++ b/test/zero_one/geni/data_sources_test.clj @@ -5,13 +5,16 @@ [zero-one.geni.core :as g] [zero-one.geni.catalog :as c] [zero-one.geni.test-resources :refer [create-temp-file! + create-temp-dir! + join-paths melbourne-df libsvm-df spark reset-session! delete-warehouse!]]) (:import - (org.apache.spark.sql AnalysisException))) + (org.apache.spark.sql AnalysisException) + (java.nio.file Paths Path FileSystem))) (def write-df (-> (melbourne-df) (g/select :Method :Type) (g/limit 5))) @@ -298,3 +301,9 @@ (g/write-table! dataset table-name) (c/table-exists? (c/catalog @spark) "tbl") => true (g/collect (g/order-by (g/read-table! table-name) :id)) => (g/collect (g/order-by (g/to-df dataset) :id)))))) + +(fact "Can read and write Delta tables" + (let [temp-dir (.toString (join-paths (.toString (create-temp-dir!)) "delta_test")) + read-df (do (g/write-delta! write-df temp-dir {:mode "overwrite"}) + (g/read-delta! temp-dir))] + (g/collect write-df) => (g/collect read-df))) diff --git a/test/zero_one/geni/delta_test.clj b/test/zero_one/geni/delta_test.clj new file mode 100644 index 00000000..506bc562 --- /dev/null +++ b/test/zero_one/geni/delta_test.clj @@ -0,0 +1,189 @@ +(ns zero-one.geni.delta-test + (:require [midje.sweet :refer [facts fact => throws with-state-changes after before]] + [zero-one.geni.delta :as d] + [zero-one.geni.core :as g] + [zero-one.geni.test-resources :as tr]) + (:import (java.io File))) + +(defn stable-collect [ds] + (-> ds g/to-df (g/order-by :id) g/collect)) + +(facts "On Delta table creation" + (with-state-changes [(before :facts (tr/reset-session!)) + (after :facts (tr/delete-warehouse!))] + (fact "Can read and write paths" + (let [data (g/range 3) + table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test")] + (g/write-delta! data table-path) + (let [delta-tbl (d/delta-table {:location table-path})] + (d/delta-table? table-path) => true + (g/collect (g/order-by (d/to-df delta-tbl) :id)) => (g/collect (g/order-by (g/to-df data) :id))))) + (fact "Can read and write managed tables" + (let [data (g/range 3)] + (g/write-table! data "test_tbl") + (d/convert-to-delta "test_tbl") + (let [delta-tbl (d/delta-table {:location "test_tbl" + :format :table})] + (g/collect (g/order-by (d/to-df delta-tbl) :id)) => (g/collect (g/order-by (g/to-df data) :id))))))) + +(facts "On simple Delta operations" + (let [data (g/range 5) + table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test")] + (g/write-delta! data table-path) + (let [delta-tbl (d/delta-table {:location table-path})] + (fact "Delete rows by condition" + (d/delete delta-tbl (g/>= (g/col :id) (g/lit 3))) + (stable-collect (d/to-df delta-tbl)) => (stable-collect (g/range 3))) + + ;; @todo Uncomment these tests once Delta has been upgraded to support Spark 3.1.x + ;; These tests pass against Spark 3.0.2 + ;(fact "Update all rows" + ; (d/update delta-tbl {:id (g/* (g/col :id) 2)}) + ; (stable-collect (d/to-df delta-tbl)) => [{:id 0} {:id 2} {:id 4}]) + ; + ;(fact "Update rows by condition" + ; (d/update delta-tbl + ; (g/=== (g/col :id) (g/lit 0)) + ; {:id (g/lit 10)}) + ; (stable-collect (d/to-df delta-tbl)) => [{:id 2} {:id 4} {:id 10}]) + + (fact "Delete all rows" + (d/delete delta-tbl) + (stable-collect (d/to-df delta-tbl)) => []) + + (fact "Atomic (transactional) appends" + (g/write-delta! (g/range 3) table-path {:mode "append"})) + + (fact "Read version history" + (-> (d/history delta-tbl) + (g/order-by :version) + (g/collect-col :operation)) => ["WRITE" "DELETE" "DELETE" "WRITE"]) ;; Should be ["WRITE" "DELETE" "UPDATE" "UPDATE" "DELETE" "WRITE"] + + (fact "Vacuuming files that are no longer active" + (g/with-settings + @tr/spark + {:spark.databricks.delta.retentionDurationCheck.enabled false} + (fn [_] + (d/vacuum delta-tbl 0.0))) + + ;; Convert to normal parquet (requires vacuuming first) + ;; https://docs.delta.io/0.8.0/delta-utility.html#convert-a-delta-table-to-a-parquet-table + (-> (tr/join-paths table-path "_delta_log") + File. + tr/recursive-delete-dir) + (stable-collect (g/read-parquet! table-path)) => (stable-collect (g/range 3)))))) + +;; @todo Uncomment these tests once Delta has been upgraded to support Spark 3.1.x +;; These tests pass against Spark 3.0.2 +;(facts "On Delta merge" +; (let [data (g/records->dataset [{:id 0 :s "A" :b false} +; {:id 1 :s "B" :b false} +; {:id 2 :s "C" :b false}])] +; (fact "Delete when matched; insert when not matched" +; (let [table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test") +; _ (g/write-delta! data table-path) +; delta-tbl (d/delta-table {:location table-path})] +; (d/merge (d/as delta-tbl "dest") +; (-> [{:id 1 :s "Z" :b true} +; {:id 3 :s "Z" :b true}] +; g/records->dataset +; (g/as "src")) +; {::d/condition (g/=== (g/col :dest.id) (g/col :src.id)) +; ::d/when-matched [{::d/on-match :delete ::d/column-rule :all}] +; ::d/when-not-matched {::d/on-not-match :insert ::d/column-rule :all}}) +; (stable-collect (d/to-df delta-tbl)) => [{:id 0 :s "A" :b false} +; {:id 2 :s "C" :b false} +; {:id 3 :s "Z" :b true}] +; (tr/recursive-delete-dir (File. table-path)))) +; (fact "Update all when matched; insert when not matched" +; (let [table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test") +; _ (g/write-delta! data table-path) +; delta-tbl (d/delta-table {:location table-path})] +; (d/merge (d/as delta-tbl "dest") +; (-> [{:id 1 :s "Z" :b true} +; {:id 3 :s "Z" :b true}] +; g/records->dataset +; (g/as "src")) +; {::d/condition (g/=== (g/col :dest.id) (g/col :src.id)) +; ::d/when-matched [{::d/on-match :update ::d/column-rule :all}] +; ::d/when-not-matched {::d/on-not-match :insert ::d/column-rule :all}}) +; (stable-collect (d/to-df delta-tbl)) => [{:id 0 :s "A" :b false} +; {:id 1 :s "Z" :b true} +; {:id 2 :s "C" :b false} +; {:id 3 :s "Z" :b true}] +; (tr/recursive-delete-dir (File. table-path)))) +; (fact "Update on condition when matched; noop when not matched" +; (let [table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test") +; _ (g/write-delta! data table-path) +; delta-tbl (d/delta-table {:location table-path})] +; (g/write-delta! data table-path {:mode "overwrite"}) +; (d/merge (d/as delta-tbl "dest") +; (-> [{:id 1 :s "Z" :b true} +; {:id 2 :s "Z" :b false} +; {:id 3 :s "Z" :b true}] +; g/records->dataset +; (g/as "src")) +; {::d/condition (g/=== (g/col :dest.id) (g/col :src.id)) +; ::d/when-matched [{::d/condition (g/col :src.b) +; ::d/on-match :update +; ::d/column-rule {:dest.id (g/* (g/col :dest.id) (g/lit 10))}}]}) +; (stable-collect (d/to-df delta-tbl)) => [{:id 0 :s "A" :b false} +; {:id 2 :s "C" :b false} +; {:id 10 :s "B" :b false}])) +; (fact "Noop when matched; insert on condition when not matched" +; (let [table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test") +; _ (g/write-delta! data table-path) +; delta-tbl (d/delta-table {:location table-path})] +; (g/write-delta! data table-path {:mode "overwrite"}) +; (d/merge (d/as delta-tbl "dest") +; (-> [{:id 1 :s "Z" :b true} +; {:id 3 :s "Z" :b false} +; {:id 4 :s "Z" :b true}] +; g/records->dataset +; (g/as "src")) +; {::d/condition (g/=== (g/col :dest.id) (g/col :src.id)) +; ::d/when-not-matched {::d/condition (g/col :src.b) +; ::d/on-not-match :insert +; ::d/column-rule :all}}) +; (stable-collect (d/to-df delta-tbl)) => [{:id 0 :s "A" :b false} +; {:id 1 :s "B" :b false} +; {:id 2 :s "C" :b false} +; {:id 4 :s "Z" :b true}])) +; (fact "Update on condition when matched; update all when matched; noop when not matched." +; (let [table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test") +; _ (g/write-delta! data table-path) +; delta-tbl (d/delta-table {:location table-path})] +; (g/write-delta! data table-path {:mode "overwrite"}) +; (d/merge (d/as delta-tbl "dest") +; (-> [{:id 0 :s "Z" :b false} +; {:id 1 :s "Z" :b true} +; {:id 2 :s "Z" :b false}] +; g/records->dataset +; (g/as "src")) +; {::d/condition (g/=== (g/col :dest.id) (g/col :src.id)) +; ::d/when-matched [{::d/condition (g/col :src.b) +; ::d/on-match :update +; ::d/column-rule {:dest.id (g/* (g/col :dest.id) (g/lit 10))}} +; {::d/on-match :update +; ::d/column-rule :all}]}) +; (stable-collect (d/to-df delta-tbl)) => [{:id 0 :s "Z" :b false} +; {:id 2 :s "Z" :b false} +; {:id 10 :s "B" :b false}])) +; (fact "Noop when matched; insert with column rule when not matched" +; (let [table-path (tr/join-paths (.toString (tr/create-temp-dir!)) "delta_test") +; _ (g/write-delta! data table-path) +; delta-tbl (d/delta-table {:location table-path})] +; (g/write-delta! data table-path {:mode "overwrite"}) +; (d/merge (d/as delta-tbl "dest") +; (-> [{:id 3 :s "Z" :b false} +; {:id 4 :s "Z" :b true}] +; g/records->dataset +; (g/as "src")) +; {::d/condition (g/=== (g/col :dest.id) (g/col :src.id)) +; ::d/when-not-matched {::d/condition (g/col :src.b) +; ::d/on-not-match :insert +; ::d/column-rule {:dest.id (g/* (g/col :src.id) (g/lit 10))}}}) +; (stable-collect (d/to-df delta-tbl)) => [{:id 0 :s "A" :b false} +; {:id 1 :s "B" :b false} +; {:id 2 :s "C" :b false} +; {:id 40 :s nil :b nil}]))))