Skip to content

Commit

Permalink
json_to_parquet-sample (#59)
Browse files Browse the repository at this point in the history
* json_to_parquet-sample

Adding JSON to Parquet sample conversion and instructions for execution

* Update json_to_parquet.py
  • Loading branch information
mariomiola authored Oct 18, 2023
1 parent 0a97e04 commit b0a76b8
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
70 changes: 70 additions & 0 deletions python/json_to_parquet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Convert JSON data to Parquet

The initial phase in data processing typically involves extracting data from a source and transforming it into a format conducive to reporting and various analytical tasks. For instance, in a database environment, this may entail importing a flat file and establishing indexes. In Spark, the initial step usually involves the cleansing and conversion of data from JSON to the Parquet format. Parquet, being an optimized binary format with excellent read performance, is particularly well-suited for reporting and analytics purposes.

## Prerequisites

Before you begin:

* Ensure your tenant is configured according to the instructions to [setup admin](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#set_up_admin)
* Know your object store namespace.
* Know the OCID of a compartment where you want to load your data and create applications.
* (Optional, but recommended): Insure you are able to run the application locally using the [Code Editor](https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/data-flow/using/code-editor-using.htm).

## Instructions

1. Upload a sample JSON file of your choice to object store.
2. Upload ```json_to_parquet.py``` to object store.
3. Create a Python Data Flow Application pointing to ```json_to_parquet.py```
3a. Refer [here](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_data_flow_library.htm#create_pyspark_app)
3b. The Spark application requires two arguments: (-i or --input-path, and -o or --output-path. These must be OCI HDFS URIs pointing to your source JSON file and target output path. Put these in the Arguments field of the Data Flow Application.
3c. Example Arguments field: "-i oci://sample@namespace/input.csv -o oci://sample@namespace/output.parquet"

## To use Code Editor to run the json_to_parquet.py

Ensure you have looked at the [Running an Application with Code Editor](https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/data-flow/using/code-editor-app-run.htm#code-editor-app-run)

The json_to_parquet.py can be downloaded from repository as soon as you create a project from template and choose the python template.

1. Within Code Editor, under the Dataflow OCI Plugin, navigate to the Compartment where you want to test application to reside
2. On the Projects folder, right click to Create Project
3. On the Project prompt, choose "Create from a template"
4. Choose then Python and then "json_to_parquet"
1. This will checkout the existing code from the Github repository and make available to run locally
5. On the new Project just create, right-click to get to the pulldown menu and set it to Run Locally
1. Provide the local python script (json_to_parquet.py) and the arguments as above example.

## To use OCI CLI to run the PySpark Application

Set all these variables based on your OCI tenancy.

```sh
COMPARTMENT_ID=ocid1.compartment.oc1..<your_compartment_id>
NAMESPACE=my_object_storage_namespace
BUCKET=my_bucket
INPUT_PATH=oci://$BUCKET@$NAMESPACE/json_file.json
OUTPUT_PATH=oci://$BUCKET@$NAMESPACE/output_parquet
```

Run these commands to upload all files.

```sh
oci os bucket create --name $BUCKET --compartment-id $COMPARTMENT_ID
oci os object put --bucket-name $BUCKET --file json_file.json
oci os object put --bucket-name $BUCKET --file json_to_parquet.py
```

Launch the Spark application to convert JSON to Parquet.

```sh
oci data-flow run submit \
--compartment-id $COMPARTMENT_ID \
--display-name "PySpark Convert JSON to Parquet" \
--execute "oci://$BUCKET@$NAMESPACE/json_to_parquet.py --input-path $INPUT_PATH --output-path $OUTPUT_PATH"
```

Make note of "id" field this command returns. When the job is finished, view its output using:

```sh
oci data-flow run get-log --run-id <run_id> --name spark_application_stdout.log.gz --file -
```
110 changes: 110 additions & 0 deletions python/json_to_parquet/json_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#!/usr/bin/env python3

# Copyright © 2023, Oracle and/or its affiliates.
# The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl.

import argparse
import os


from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext


def main():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--profile_name',
help='OCI Profile', required=False)
parser.add_argument('-i', '--input-path',
help='Input file or file path', required=True)
parser.add_argument('-o', '--output-path',
help='Output file path', required=True)

args = parser.parse_args()

# Set up Spark.
spark_session = get_dataflow_spark_session()
sql_context = SQLContext(spark_session)

# Load our data.
input_dataframe = sql_context.read.json(args.input_path)

# Save the results as Parquet.
input_dataframe.write.mode("overwrite").parquet(args.output_path)

# Show on the console that something happened.
print("Successfully converted {} rows to Parquet and wrote to {}.".format(
input_dataframe.count(), args.output_path))


def get_dataflow_spark_session(
app_name="DataFlow_JSON2Parquet", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
if in_dataflow():
spark_builder = SparkSession.builder.appName(app_name)
else:
# Import OCI.
try:
import oci
except:
raise Exception(
"You need to install the OCI python library to test locally"
)

# Use defaults for anything unset.
if file_location is None:
file_location = oci.config.DEFAULT_LOCATION
if profile_name is None:
profile_name = oci.config.DEFAULT_PROFILE

# Load the config file.
try:
oci_config = oci.config.from_file(
file_location=file_location, profile_name=profile_name
)
except oci.exceptions.ConfigFileNotFound as e:
print(
"OCI config file not found. Please ensure the file exists and is accessible.")
raise e
except oci.exceptions.InvalidConfig as e:
print("Invalid OCI config. Please check your configuration settings.")
raise e
except Exception as e:
print("An unexpected error occurred.")
raise e
conf = SparkConf()
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
conf.set("fs.oci.client.auth.userId", oci_config["user"])
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
conf.set(
"fs.oci.client.hostname",
"https://objectstorage.{0}.oraclecloud.com".format(
oci_config["region"]),
)
conf.set("fs.oci.client.apache.connection.closing.strategy",
"immediate") # Large Files with partial reads
spark_builder = SparkSession.builder.appName(
app_name).config(conf=conf)

# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)

# Create the Spark session.
session = spark_builder.getOrCreate()
return session


def in_dataflow():
"""
Determine if we are running in OCI Data Flow by checking the environment.
"""
return os.environ.get("HOME") == "/home/dataflow"


if __name__ == "__main__":
main()

0 comments on commit b0a76b8

Please sign in to comment.