Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds partial Delta Lake support #328

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[org.apache.spark/spark-mllib_2.12 "3.1.1"]
[org.apache.spark/spark-sql_2.12 "3.1.1"]
[org.apache.spark/spark-streaming_2.12 "3.1.1"]
[io.delta/delta-core_2.12 "0.8.0"]
; Arrow
[org.apache.arrow/arrow-memory-netty "3.0.0"]
[org.apache.arrow/arrow-memory-core "3.0.0"]
Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[org.apache.spark/spark-mllib_2.12 "3.1.1"]
[org.apache.spark/spark-sql_2.12 "3.1.1"]
[org.apache.spark/spark-streaming_2.12 "3.1.1"]
[io.delta/delta-core_2.12 "0.8.0"]
; Arrow
[org.apache.arrow/arrow-memory-netty "3.0.0"]
[org.apache.arrow/arrow-memory-core "3.0.0"]
Expand Down
5 changes: 5 additions & 0 deletions src/clojure/zero_one/geni/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@
[zero-one.geni.spark
create-spark-session
spark-conf
set-settings
with-settings
sql])

(import-vars
Expand Down Expand Up @@ -480,6 +482,7 @@
rollup
sample
sample-by
schema
select
select-expr
show
Expand Down Expand Up @@ -522,6 +525,7 @@
read-avro!
read-binary!
read-csv!
read-delta!
read-edn!
read-jdbc!
read-json!
Expand All @@ -532,6 +536,7 @@
read-xlsx!
write-avro!
write-csv!
write-delta!
write-edn!
write-jdbc!
write-json!
Expand Down
28 changes: 28 additions & 0 deletions src/clojure/zero_one/geni/core/data_sources.clj
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,31 @@
view, e.g. `SELECT * FROM global_temp.view1`."
[^Dataset dataframe ^String view-name]
(.createOrReplaceGlobalTempView dataframe view-name))


;; Delta
(defmulti read-delta!
"Loads a delta table from a directory and returns the results as a DataFrame.

Spark's DataFrameReader options may be passed in as a map of options.

See: https://spark.apache.org/docs/latest/sql-data-sources.html
See: https://docs.delta.io/latest/quick-start.html#read-data"
(fn [head & _] (class head)))
(defmethod read-delta! :default
([path] (read-delta! @defaults/spark path))
([path options] (read-delta! @defaults/spark path options)))
(defmethod read-delta! SparkSession
([spark path] (read-delta! spark path {}))
([spark path options] (read-data! "delta" spark path options)))


(defn write-delta!
"Writes a delta table at the specified path.

Spark's DataFrameWriter options may be passed in as a map of options.

See: https://spark.apache.org/docs/latest/sql-data-sources.html
See: https://docs.delta.io/latest/quick-start.html#create-a-table"
([dataframe path] (write-delta! dataframe path {}))
([dataframe path options] (write-data! "delta" dataframe path options)))
7 changes: 6 additions & 1 deletion src/clojure/zero_one/geni/core/dataset.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[zero-one.geni.interop :as interop]
[zero-one.geni.utils :refer [ensure-coll]])
(:import
(org.apache.spark.sql Column)))
(org.apache.spark.sql Column Dataset)
(org.apache.spark.sql.types StructType)))

;;;; Actions
(defn- collected->maps [collected]
Expand All @@ -38,6 +39,10 @@
(defn take [dataframe n-rows]
(-> dataframe (.take n-rows) collected->maps))

(defn schema ^StructType
[^Dataset dataframe]
(.schema dataframe))

(defn show
([dataframe] (show dataframe {}))
([dataframe options]
Expand Down
11 changes: 8 additions & 3 deletions src/clojure/zero_one/geni/defaults.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
(ns zero-one.geni.defaults
(:require
[zero-one.geni.spark]))
[zero-one.geni.spark]
[zero-one.geni.interop :as iop]))

(def session-config
{:configs {:spark.sql.adaptive.enabled "true"
:spark.sql.adaptive.coalescePartitions.enabled "true"}
{:configs (merge {:spark.sql.adaptive.enabled "true"
:spark.sql.adaptive.coalescePartitions.enabled "true"}
(when (iop/class-exists? 'io.delta.sql.DeltaSparkSessionExtension)
;; Required for writing Delta tables.
{:spark.sql.extensions "io.delta.sql.DeltaSparkSessionExtension"
:spark.sql.catalog.spark_catalog "org.apache.spark.sql.delta.catalog.DeltaCatalog"}))
:checkpoint-dir "target/checkpoint/"})

(def spark
Expand Down
205 changes: 205 additions & 0 deletions src/clojure/zero_one/geni/delta.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
(ns zero-one.geni.delta
(:refer-clojure :exclude [update merge])
(:require [clojure.spec.alpha :as s]
[zero-one.geni.core :as g]
[zero-one.geni.defaults :as defaults]
[zero-one.geni.interop :as i]
[zero-one.geni.utils :refer [with-dynamic-import]])
(:import (org.apache.spark.sql Column Dataset SparkSession)
(java.util Map)))

(s/def ::format #{:path :table})
(s/def ::column-name (s/or :kw keyword? :str string?))
(s/def ::column #(instance? Column %))
(s/def ::condition ::column)
(s/def ::on-match #{:delete :update})
(s/def ::on-not-match #{:insert})

(s/def ::column-rule
(s/or :some (s/map-of ::column-name ::column)
:all #{:all}))

(s/def ::when-matched-clause
(s/keys :req [::on-match ::column-rule]
:opt [::condition]))

(s/def ::when-matched
(s/coll-of ::when-matched-clause :kind sequential? :max-count 2))

(s/def ::when-not-matched
(s/keys :req [::on-not-match ::column-rule]
:opt [::condition]))

(s/def ::merge-strategy
(s/keys :req [::condition]
:opt [::when-matched ::when-not-matched]))

;; @todo Uncomment this code once Delta has been upgraded to support Spark 3.1.x
;; This code has been tested against Spark 3.0.2
;(defn validate-merge-strategy
; "
; See constraints: https://docs.delta.io/latest/api/scala/io/delta/tables/DeltaMergeBuilder.html"
; [merge-strategy]
; {:pre [(s/valid? ::merge-strategy merge-strategy)]}
; (let [when-matched (get merge-strategy ::when-matched [])]
; ; There can be at most one `update` action and one `delete` action in when-matched clauses.
; (let [clause-counts (->> when-matched
; (group-by ::on-match)
; (map (fn [[k v]] [k (count v)])))]
; (when (> (count clause-counts) 2)
; (ex-info (str "Found more than 2 `when-matched` clauses in a Delta merge strategy. There can only be 1 update and 1 delete.")
; {:clause-kinds (keys clause-counts)
; ::merge-strategy merge-strategy}))
; (doseq [[clause-kind clause-count] clause-counts]
; (when (> clause-count 1)
; (ex-info (str "Found multiple " clause-kind " clauses in a Delta merge strategy.")
; {:clause-kind clause-kind
; :clause-count clause-count
; ::merge-strategy merge-strategy}))))
;
; ; If there are two when-matched clauses, then the first one must have a condition.
; (when (and (= (count when-matched) 2)
; (= (::column-rule (first when-matched)) :all))
; (ex-info "When using two when-matched clauses in a Delta merge the first one must have a condition, not :all."
; {::merge-strategy merge-strategy})))
; true)
;
;(defn- prepare-update-set-map ^Map
; [m]
; (->> m
; (map (fn [[k v]] [(name k) (g/col v)]))
; (into {})))

(with-dynamic-import
[[io.delta.tables DeltaTable DeltaMergeBuilder]]

(defn delta-table ^DeltaTable
([opts] (delta-table @defaults/spark opts))
([^SparkSession spark {:keys [format location] :or {format :path}}]
{:pre [(s/valid? ::format format)]}
(if (= format :path)
(. DeltaTable forPath spark location)
(. DeltaTable forName spark location))))

(defn delta-table?
([^String identifier]
(DeltaTable/isDeltaTable identifier))
([^SparkSession spark ^String identifier]
(DeltaTable/isDeltaTable spark identifier)))

(defn as ^DeltaTable
[^DeltaTable table ^String alias]
;; @todo move to polymorphic.clj? How to handle dynamic import in multimethod?
(.as table alias))

(defn to-df ^Dataset
[^DeltaTable table]
;; @todo move to polymorphic.clj? How to handle dynamic import in multimethod?
(.toDF table))

(defmulti convert-to-delta (fn [head & _] (class head)))
(defmethod convert-to-delta :default
[^String identifier & {:keys [partition-schema]}]
(convert-to-delta @defaults/spark identifier :partition-schema partition-schema))
(defmethod convert-to-delta SparkSession
[^SparkSession spark ^String identifier & {:keys [partition-schema]}]
(if (nil? partition-schema)
(DeltaTable/convertToDelta spark identifier)
(DeltaTable/convertToDelta spark identifier (g/->schema partition-schema))))

(defmulti delete (fn [& args] (class (last args))))
(defmethod delete :default
[^DeltaTable table]
(.delete table))
(defmethod delete Column
[^DeltaTable table ^Column condition]
(.delete table condition))

(defn history ^Dataset
([^DeltaTable table]
(.history table))
([^DeltaTable table ^Integer limit]
(.history table limit)))

(defn vacuum
([^DeltaTable table]
(.vacuum table))
([^DeltaTable table ^Double retention-hours]
(.vacuum table retention-hours)))

;; @todo Uncomment this code once Delta has been upgraded to support Spark 3.1.x
;; This code has been tested against Spark 3.0.2
;(defn update
; ;; @todo Update is broken with Spark 3.1 + Delta 0.8. upgrade Delta ASAP.
; ;; https://github.com/delta-io/delta/issues/594
; ([^DeltaTable table set-to]
; (let [m (prepare-update-set-map set-to)]
; (.update table m)))
; ([^DeltaTable table ^Column condition set-to]
; (.update table condition (prepare-update-set-map set-to))))
;
;(defn- apply-when-matched-clause ^DeltaMergeBuilder
; [^DeltaMergeBuilder merge-builder when-matched-clause]
; (let [{:zero-one.geni.delta/keys [condition on-match column-rule]} when-matched-clause
; on-match-builder (if (nil? condition)
; (.whenMatched merge-builder)
; (.whenMatched merge-builder condition))]
; (cond
; (and (= on-match :update) (= column-rule :all))
; (.updateAll on-match-builder)
;
; (= on-match :update)
; (let [scala-column-rule (->> column-rule
; (map (fn [[k column]] [(name k) column]))
; (into {})
; (i/->scala-map))]
; (.update on-match-builder scala-column-rule))
;
; (and (= on-match :delete) (= column-rule :all))
; (.delete on-match-builder)
;
; (= on-match :delete)
; (ex-info "If a Delta merge `when-matched` clause is set to `delete`, it must use the column-rule `all`."
; {::when-matched-clause when-matched-clause})
;
; :else
; (ex-info (str "Unknown `on-match` for Delta merge strategy.")
; {::when-matched-clause when-matched-clause}))))
;
;(defn- apply-when-not-matched ^DeltaMergeBuilder
; [^DeltaMergeBuilder merge-builder when-not-matched]
; (let [{:zero-one.geni.delta/keys [on-not-match column-rule condition]} when-not-matched
; not-match-builder (if (nil? condition)
; (.whenNotMatched merge-builder)
; (.whenNotMatched merge-builder condition))]
; (cond
; (and (= on-not-match :insert) (= column-rule :all))
; (.insertAll not-match-builder)
;
; (= on-not-match :insert)
; (let [scala-column-rule (->> column-rule
; (map (fn [[k column]] [(name k) column]))
; (into {})
; (i/->scala-map))]
; (.insert not-match-builder scala-column-rule))
;
; :else
; (ex-info (str "Unknown `on-not-match` for Delta merge strategy.")
; {::when-not-matched when-not-matched}))))
;
;(defn merge
; [^DeltaTable destination ^Dataset source merge-strategy]
; {:pre [(validate-merge-strategy merge-strategy)]}
; ;; @todo Update is broken with Spark 3.1 + Delta 0.8. upgrade Delta ASAP.
; ;; https://github.com/delta-io/delta/issues/594
; (let [merge-builder (.merge destination source (::condition merge-strategy))
; with-on-matched (reduce (fn [builder clause]
; (apply-when-matched-clause builder clause))
; merge-builder
; (get merge-strategy ::when-matched []))
; with-not-matched (let [clause (::when-not-matched merge-strategy)]
; (if (nil? clause)
; with-on-matched
; (apply-when-not-matched with-on-matched clause)))]
; (.execute with-not-matched)))
)
19 changes: 14 additions & 5 deletions src/clojure/zero_one/geni/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[clojure.java.data :as j]
[clojure.string :refer [replace-first]]
[clojure.walk :as walk]
[clojure.reflect :refer [resolve-class]]
[zero-one.geni.docs :as docs]
[zero-one.geni.utils :refer [ensure-coll]])
(:import
Expand All @@ -20,8 +21,9 @@
Function3
Tuple2
Tuple3)
(scala.collection JavaConversions Map Seq)
(scala.collection.convert Wrappers$IterableWrapper)))
(scala.collection Map Seq)
(scala.collection.convert Wrappers$IterableWrapper)
(scala.collection JavaConverters)))

(declare ->clojure)

Expand All @@ -44,15 +46,15 @@
(instance? Tuple3 value))

(defn scala-seq->vec [scala-seq]
(vec (JavaConversions/seqAsJavaList scala-seq)))
(-> scala-seq (JavaConverters/seqAsJavaList) vec))

(defn scala-map->map [^Map m]
(into {}
(for [[k v] (JavaConversions/mapAsJavaMap m)]
(for [[k v] (JavaConverters/mapAsJavaMap m)]
[k (->clojure v)])))

(defn ->scala-seq [coll]
(JavaConversions/asScalaBuffer (seq coll)))
(JavaConverters/asScalaBuffer (seq coll)))

(defn ->scala-tuple2 [coll]
(Tuple2. (first coll) (second coll)))
Expand All @@ -75,6 +77,13 @@
(defn ->scala-function3 [f]
(reify Function3 (apply [_ x y z] (f x y z))))

(defn ->scala-map
[m]
(JavaConverters/mapAsScalaMap m))

(defn class-exists? [c]
(resolve-class (.getContextClassLoader (Thread/currentThread)) c))

(defn optional->nillable [value]
(when (.isPresent value)
(.get value)))
Expand Down
Loading