Skip to content

Commit

Permalink
Dataflow official public github example as per code editor blog useca…
Browse files Browse the repository at this point in the history
…se (#47)

* Sample to read csv from Objectstorage then write to OCI metastore then read from OCI metastore and write to Oracle ADW then read and show from oracle ADW

* Sample to read csv from Objectstorage then write to OCI metastore then read from OCI metastore and write to Oracle ADW then read and show from oracle ADW

* Sample to read csv from Objectstorage then write to OCI metastore then read from OCI metastore and write to Oracle ADW then read and show from oracle ADW

* Sample to read csv from Objectstorage then write to OCI metastore then read from OCI metastore and write to Oracle ADW then read and show from oracle ADW

* Dataflow official public github example as per code editor blog usecase
  • Loading branch information
TejashwaKVerma authored Apr 12, 2023
1 parent e2bb85e commit 8eef17f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 32 deletions.
22 changes: 11 additions & 11 deletions python/csv_metastore_adw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ Upload a sample CSV file to OCI object store.

## Application Setup

Customize ```csvToMetastoreToADW.py``` with:

* Set INPUT_PATH to the OCI path of your CSV data.
* Set PASSWORD of ADW instance.
* Set databaseName db in ADW where data is to be written.
* Set tableName table name argument in ADW where data is to be written.
* Set CONNECTION_ID to a TNS name valid for the database.
* Set USER to the user who generated the wallet file.
* Set WALLET_PATH to the path on object store for the wallet.
Customize(if required) ```csvToMetastoreToADW.py``` with:

Test the Application Locally (recommended):
You can test the application in code editor data flow plugin locally using Run locally:

```sh
Language: Python
FileName: csvToMetastoreToADW.py
Enable Spark Oracle data source property
Enable Spark Oracle metastore property
Select compartment
Select metastore
Arguments: --table <metastore & adw table table name> --database <metastore database name> --input <oci://bucket@namespace/sample.csv> --walletUri <oci://bucket@namespace/Wallet.zip> --user <user who generated the wallet file> --password <password to the database> --connection <TNS name valid for the database>
```
## Deploy and Run the Application

* Copy csvToMetastoreToADW.py to object store or upload csvToMetastoreToADW.py from Dataflow upload artifact utility.
Expand All @@ -51,7 +51,7 @@ oci data-flow application create \
--num-executors 1 \
--spark-version 2.4.4 \
--file-uri oci://<bucket>@<namespace>/csvToMetastoreToADW.py \
--arguments --table <table_name>
--arguments --table <metastore & adw table table name> --database <metastore database name> --input <oci://bucket@namespace/sample.csv> --walletUri <oci://bucket@namespace/Wallet.zip> --user <user who generated the wallet file> --password <password to the database> --connection <TNS name valid for the database>
--language Python
oci data-flow run create \
--application-id <application_ocid> \
Expand Down
67 changes: 46 additions & 21 deletions python/csv_metastore_adw/csvToMetastoreToADW.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,51 @@
A simple example demonstrates the usage of oci metastore and oracle datasource.
"""
import argparse
import os


from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StringType


def oracle_datasource_example(spark):

#properties = {"adbId": ADB_ID, "user": USER, "password": PASSWORD}
properties = {"walletUri": walletUri, "user": USER, "password": PASSWORD}

print("Step 1: Read csv from object storage")
src_df = spark.read.options(delimiter=',').option("header",True).csv("INPUT_PATH")
src_df = spark.read.options(delimiter=',').option("header",True).csv(INPUT_PATH)
dataList = []

print("Reading data from object storage !")
src_df.show()
print("================================================================================================")

print("Step 2: Write csv data into Metastore")

print("Step 2: Write csv data schema into Metastore")

spark.sql("use " + databaseName)
spark.sql("show tables").show(30)

spark.sql("create database IF NOT EXISTS " + databaseName)
print("Successfully created database: " + databaseName)
src_df.write.mode("overwrite").saveAsTable(databaseName + "." + tableName)
print("Wrote data in Database: " + databaseName + " ; table: " + tableName)
print("================================================================================================")

print("Step 3: Read data from Metastore and write into ADW")
tableDf = spark.sql("select * from " + databaseName + "." + tableName);
metaSchema = src_df.schema

for i, cols in enumerate(metaSchema.fields):
dataList.append(cols.name)
schemaDf = spark.createDataFrame(dataList, StringType())
schemaDf.show(truncate=False)

schemaDf.write.mode("overwrite").saveAsTable(databaseName + "." + tableName)
print("Wrote data schema in Database: " + databaseName + " ; table: " + tableName)

print("Read data schema from Metastore")
tableDf = spark.sql("select * from " + databaseName + "." + tableName)
print("Reading data from metastore !")
tableDf.show()
# Note: providing connectionId is optional with adbId
print("Writing data into ADW");
tableDf.write.format("oracle") \
print("================================================================================================")
print("Step 3: Writing data into ADW");
src_df.write.format("oracle") \
.options(**properties).option("dbtable",tableName) \
.option("connectionId", CONNECTION_ID) \
.mode("Overwrite") \
Expand All @@ -41,21 +56,31 @@ def oracle_datasource_example(spark):
adwDf = spark.read.format("oracle").options(**properties).option("dbtable",tableName).option("connectionId", CONNECTION_ID).load();
adwDf.show();

class customCol:
def __init__(self, name, dataType):
self.name = name
self.dataType = dataType

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--table", required=True)
parser.add_argument("--input", required=True)
parser.add_argument("--walletUri", required=True)
parser.add_argument("--user", required=True)
parser.add_argument("--password", required=True)
parser.add_argument("--connection", required=True)
parser.add_argument("--database", required=True)
args = parser.parse_args()
spark = SparkSession.builder.appName("Python Spark Oracle Datasource Example").enableHiveSupport().getOrCreate()


# TODO: Set all these variables.
INPUT_PATH = "oci://<bucket>@<tenancy>/fake_data.csv"
walletUri = "oci://<bucket>@<tenancy>/<wallet>.zip"
USER = "ADMIN"
PASSWORD = "<ADW Password>>"
CONNECTION_ID = "<tnsname>"
databaseName = "<db_name>"
INPUT_PATH = args.input
walletUri = args.walletUri
USER = args.user
PASSWORD = args.password
CONNECTION_ID = args.connection
databaseName = args.database
tableName = args.table
oracle_datasource_example(spark)
spark.stop()


0 comments on commit 8eef17f

Please sign in to comment.