Skip to content

Commit

Permalink
Delta Lake documentation and sample code (#14)
Browse files Browse the repository at this point in the history
* test

* removing test file

* adding Delta Lake documentation

* ODI and Data Flow Integration

* ODI and Data Flow Integration

* ODI and Data Flow Integration

* Delta Lake integration documentation and sample code.

* Delta Lake integration documentation and sample code.

* Delta Lake integration documentation and sample code.

Co-authored-by: Mohan Kumar <[email protected]>
  • Loading branch information
mohanln-cloud and Mohan Kumar authored Jul 29, 2022
1 parent 579fa7b commit 9caa900
Show file tree
Hide file tree
Showing 10 changed files with 4 additions and 25 deletions.
5 changes: 0 additions & 5 deletions deltalake/python/delta_lake_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
deltaTablePath = sys.argv[2]
parquetTablePath = sys.argv[3]


original_df = spark.read.format("csv").option("header", "true").load(csvFilePath).withColumn("time_stamp", current_timestamp())
original_df.write.partitionBy("vendor_id").format("delta").mode("overwrite").save(deltaTablePath)
original_df.write.mode("overwrite").parquet(parquetTablePath)
Expand Down Expand Up @@ -66,7 +65,6 @@
df2_1 = spark.sql("SELECT count(*) FROM delta.`" + deltaTablePath + "@v1`")
df2_1.show()


print("\nHistory for versionAsOf 3" + deltaTablePath)
df3 = spark.read.format("delta").option("versionAsOf", 1).load(deltaTablePath)
df3.show()
Expand All @@ -79,11 +77,8 @@
describe = spark.sql("DESCRIBE HISTORY delta.`" + deltaTablePath + "`");
describe.show()


spark.sql("CONVERT TO DELTA parquet.`" + parquetTablePath + "`");



print("\nStarting SQL DeltaTable operations on " + deltaTablePath)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, deltaTablePath)
Expand Down
7 changes: 3 additions & 4 deletions deltalake/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ To know more get started [here](https://docs.oracle.com/en-us/iaas/data-flow/usi

Delta Lake 1.2.1 now integrated in Data Flow Spark 3.2.1 processing engine. To use this feature please
Select your application’s Spark version to Spark 3.2.1 from console, or cli.
Use `delta` format as mentioned in the documentations. Delta Lake release [notes](https://github.com/delta-io/delta/releases/tag/v1.2.1) and [documentation](https://docs.delta.io/latest/delta-intro.html#) for further understanding. Below are some sample usage
Use `delta` format as mentioned in the documentations. Delta Lake release [notes](https://github.com/delta-io/delta/releases/tag/v1.2.1) and [documentation](https://docs.delta.io/latest/delta-intro.html#) for further understanding.

Running Delta Lake api is as easy as any other format. Data Flow Spark engine support `delta` by format default. Delta Lake API are available in Java/Scala/Python languages, Include delta-spark package by `pip install delta-spark` If you are using custom archive.zip dependency packager.
OCI Data Flow Spark engine support `delta` format default and running Delta Lake api is as easy as any other format. Delta Lake API are available in Java/Scala/Python languages, Include `delta-spark` python package as mentioned in [Adding Third-Party Libraries](https://docs.oracle.com/en-us/iaas/data-flow/using/third-party-libraries.htm) If you are using custom archive.zip dependency packager.

Java/Scala
> spark.read().format("delta").load(deltaTablePath)
Expand Down Expand Up @@ -70,5 +70,4 @@ Arguments: oci://<location>/samplecsv.csv oci://<location>/delta/spark-DeltaTabl
Main class : com.oracle.delta.GenerateCSVData
Arguments: oci://<location>/samplecsv.csv oci://<location>/delta/spark-DeltaTable oci://<location>/delta/spark-ParquetToDeltaTable <sleepTimeInSec> <totalRuns>
```
`Note: Build jar artifact from "mvn clean install`

`Note: Build jar artifact from "mvn clean install`
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ object DeltaStreamRun {
val checkpoint = args(3)

println("\n" + inputPath + ", " + deltaPath + ", " + checkpoint)

DeltaTable.StartDeltaStreamSink(inputPath, parquetPath, checkpoint)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ object GenerateCSVData {
println("Wakeup " + count + " -- " + System.currentTimeMillis())
DeltaTable.generateCSVData(inputPath, parquetPath + s"-$count")


Thread.sleep(1000 * sleepTimeInSec.toInt)
count = count + 1
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ object LongRunDelta {
Thread.sleep(1000 * sleepTimeInSec.toInt)
count = count + 1
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ object TPCDSReader extends App{
.filter(s => s.startsWith("query")).toList

// println(lines)
// Users/mohankumarln/Desktop/TPC-DS-Spark321/Spark 3.2.1 SP1000-run3

lines.foreach(line =>
{
Expand Down
2 changes: 0 additions & 2 deletions deltalake/scala/src/main/scala/com/oracle/spark/LongRun.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ object LongRun {
Thread.sleep(1000 * sleepTimeInSec.toInt)
count = count + 1
}

}

}
2 changes: 0 additions & 2 deletions deltalake/scala/src/main/scala/com/oracle/spark/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ object Main {
val inputPath = args(0)
val outputPath = args(1)


val spark = SparkSession
.builder()
.appName("Spark Simulation")
Expand All @@ -28,7 +27,6 @@ object Main {
.option("header", "true")
.load(inputPath)


original_df
.withColumn("time_stamp", current_timestamp())
.write.partitionBy("vendor_id")
Expand Down
3 changes: 0 additions & 3 deletions deltalake/scala/src/main/scala/com/oracle/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,4 @@ object Utils {
original_df.write.partitionBy("vendor_id").mode("overwrite").parquet(outputPath)

}



}
2 changes: 1 addition & 1 deletion odi-dataflow/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
This repository provides examples demonstrating how to use Oracle Data Integrator to integrate with Oracle Cloud Infrastructure Data Flow,

1. Download and upzip archive file.
2. Import [KM_LKM_Spark_DataFlow_to_Object_Storage_REST.xml](https://github.com/oracle/oracle-dataflow-samples/odi-dataflow/KM_LKM_Spark_DataFlow_to_Object_Storage_REST.xml) your project or Global KMs area.
2. Import [KM_LKM_Spark_DataFlow_to_Object_Storage_REST.xml](https://github.com/oracle/oracle-dataflow-samples/ODI-DataFlow/KM_LKM_Spark_DataFlow_to_Object_Storage_REST.xml) your project or Global KMs area.
3. Follow the blog post for guidance on how to configure and use the KM.

0 comments on commit 9caa900

Please sign in to comment.