Skip to content

Commit

Permalink
Sample python script logging (#66)
Browse files Browse the repository at this point in the history
* Add ability to use resource principal when iterating with OS inside Spark

- Resource principals changes in the Spark session
- Example to iterate with OS

* Update DataFlowSparkSession.java

* Create logging_sample.py

Logging sample initial

* Update logging_sample.py
  • Loading branch information
mariomiola authored Sep 17, 2024
1 parent 8f9ff44 commit 2d3be5d
Showing 1 changed file with 182 additions and 0 deletions.
182 changes: 182 additions & 0 deletions python/logging/logging_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/usr/bin/env python3

# Copyright © 2024, Oracle and/or its affiliates.
# The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl.

# Log4j Initialization:
# We initialize the Log4j logger using spark._jvm.org.apache.log4j.LogManager.getLogger(__name__). This ensures that PySpark logs are integrated with Spark’s JVM logging infrastructure, allowing you to track logs across the entire cluster.
#2. Task Definitions:
# Task 1: Reads data from a JSON file.
# Task 2: Applies a simple transformation (e.g., adding a constant column).
# Task 3: Performs aggregation (e.g., counting rows by a specific column).
# Task 4: Saves the resulting DataFrame back to a PARQUET file.
#3. Logging at Task Completion:
# For each task, we log a message before starting the task (e.g., "Task 1 started: Reading JSON data.") and upon task completion (e.g., "Task 1 completed: JSON data loaded successfully.").
# These log messages provide insight into the task orchestration and allow you to trace the execution flow in Spark’s logs.
#4. Error Handling:
# The script uses a try-except-finally block to ensure that any errors during task execution are caught and logged. The exc_info=True argument in the logger captures the full stack trace in the log.
# If an error occurs, the script logs the exception and continues to the finally block where it ensures the Spark session is stopped.
#Running the Script on a Spark Cluster:
# Submit the Script: You can submit this script to a Spark cluster using the spark-submit command:
# bash
# spark-submit --master <cluster-mode> path_to_script.py
# Check Logs: Log messages generated by the Log4j logger will appear in the Spark application logs, accessible via:
# executor/driver logs
# This allows you to track the execution of each task and detect potential issues.
#Benefits of Using Log4j in PySpark:
# Cluster-Wide Logging: Logs are available from both the driver and executors, ensuring you can trace logs from distributed tasks.
# Integrated Monitoring: If your cluster uses a logging aggregation system, the logs will be centralized, aiding debugging and monitoring efforts.


import argparse
import os
import logging
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F


def main():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--profile_name',
help='OCI Profile', required=False)
parser.add_argument('-i', '--input-path',
help='Input JSON file or file path', required=True)
parser.add_argument('-o', '--output-path',
help='Output PARQUET file path', required=True)

args = parser.parse_args()
input_path = args.input_path
output_path = args.output_path
# Set up Spark.
spark = get_dataflow_spark_session()
sql_context = SQLContext(spark)

# Initialize Log4j Logger from SparkContext
log4j_logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__)
log4j_logger.info("PySpark Application started.")


# Define multiple tasks (Example: Task 1 - Reading a CSV, Task 2 - Data Transformation, Task 3 - Data Aggregation)
def task1(input_path):
log4j_logger.info("Task 1 started: Reading JSON data.")
df = sql_context.read.json(input_path)
log4j_logger.info("Task 1 completed: JSON data loaded successfully.")
return df

def task2(df):
log4j_logger.info("Task 2 started: Transforming data.")
# Example Transformation: Add a new column with a constant value
transformed_df = df.withColumn("new_column", F.lit("constant_value"))
log4j_logger.info("Task 2 completed: Data transformation done.")
return transformed_df

def task3(df):
log4j_logger.info("Task 3 started: Aggregating data.")
# Example Aggregation: Count number of rows for each value in a column
aggregated_df = df.groupBy("new_column").count()
log4j_logger.info("Task 3 completed: Data aggregation done.")
return aggregated_df

def task4(df, output_path):
log4j_logger.info("Task 4 started: Saving data to file.")
# Save the DataFrame to a PARQUET file
df.write.mode("overwrite").parquet(output_path)
log4j_logger.info("Task 4 completed: Data saved successfully.")

# Orchestrate the tasks
try:
log4j_logger.info("Starting the orchestration of tasks.")

# Execute Task 1: Read JSON
df = task1(input_path)

# Execute Task 2: Data Transformation
transformed_df = task2(df)

# Execute Task 3: Data Aggregation
aggregated_df = task3(transformed_df)

# Execute Task 4: Save Data
task4(aggregated_df, output_path)

log4j_logger.info("All tasks completed successfully.")

except Exception as e:
log4j_logger.error(f"An error occurred: {e}", exc_info=True)

finally:
# Stop Spark session
spark.stop()
log4j_logger.info("Spark session stopped.")

def get_dataflow_spark_session(
app_name="DataFlow_JSON2Parquet", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
if in_dataflow():
spark_builder = SparkSession.builder.appName(app_name)
else:
# Import OCI.
try:
import oci
except:
raise Exception(
"You need to install the OCI python library to test locally"
)

# Use defaults for anything unset.
if file_location is None:
file_location = oci.config.DEFAULT_LOCATION
if profile_name is None:
profile_name = oci.config.DEFAULT_PROFILE

# Load the config file.
try:
oci_config = oci.config.from_file(
file_location=file_location, profile_name=profile_name
)
except oci.exceptions.ConfigFileNotFound as e:
print(
"OCI config file not found. Please ensure the file exists and is accessible.")
raise e
except oci.exceptions.InvalidConfig as e:
print("Invalid OCI config. Please check your configuration settings.")
raise e
except Exception as e:
print("An unexpected error occurred.")
raise e
conf = SparkConf()
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
conf.set("fs.oci.client.auth.userId", oci_config["user"])
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
conf.set(
"fs.oci.client.hostname",
"https://objectstorage.{0}.oraclecloud.com".format(
oci_config["region"]),
)
conf.set("fs.oci.client.apache.connection.closing.strategy",
"immediate") # Large Files with partial reads
spark_builder = SparkSession.builder.appName(
app_name).config(conf=conf)

# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)

# Create the Spark session.
session = spark_builder.getOrCreate()
return session


def in_dataflow():
"""
Determine if we are running in OCI Data Flow by checking the environment.
"""
return os.environ.get("HOME") == "/home/dataflow"

if __name__ == "__main__":
main()

0 comments on commit 2d3be5d

Please sign in to comment.