-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmergeDiff.sc
30 lines (27 loc) · 1.09 KB
/
mergeDiff.sc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import $ivy.`org.apache.flink:flink-table-runtime:1.17.1`
import $ivy.`org.apache.flink:flink-table-planner-loader:1.17.1`
import $ivy.`org.apache.flink:flink-connector-base:1.17.1`
import $ivy.`org.apache.flink:flink-connector-files:1.17.1`
import $ivy.`org.apache.flink:flink-clients:1.17.1`
import $ivy.`org.slf4j:slf4j-log4j12:1.7.15`
import $ivy.`org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0`
import $ivy.`org.apache.paimon:paimon-flink-1.17:0.5.0-incubating`
import org.apache.paimon.flink.action.MergeIntoAction
val warehouse = "file:/tmp/paimon"
val catalog = "my_catalog"
val database = "default"
new MergeIntoAction(warehouse, database, "country_sales")
.withTargetAlias("cs")
.withSourceTable("S")
.withSourceSqls(
s"""CREATE CATALOG $catalog WITH (
'type'='paimon',
'warehouse'='$warehouse'
);""",
s"USE CATALOG $catalog",
s"CREATE TEMPORARY VIEW S AS SELECT distinct country from `$database`.customers"
)
.withMergeCondition("cs.country = S.country")
.withNotMatchedInsert(null, "S.country, 0, 0")
.withNotMatchedBySourceDelete(null)
.run()