From 2d3be5db5b493ce2b490754990cd97bbad4552f4 Mon Sep 17 00:00:00 2001 From: Mario Miola <108755375+mariomiola@users.noreply.github.com> Date: Tue, 17 Sep 2024 09:33:12 -0700 Subject: [PATCH] Sample python script logging (#66) * Add ability to use resource principal when iterating with OS inside Spark - Resource principals changes in the Spark session - Example to iterate with OS * Update DataFlowSparkSession.java * Create logging_sample.py Logging sample initial * Update logging_sample.py --- python/logging/logging_sample.py | 182 +++++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 python/logging/logging_sample.py diff --git a/python/logging/logging_sample.py b/python/logging/logging_sample.py new file mode 100644 index 0000000..6ac3980 --- /dev/null +++ b/python/logging/logging_sample.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 + +# Copyright © 2024, Oracle and/or its affiliates. +# The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl. + +# Log4j Initialization: +# We initialize the Log4j logger using spark._jvm.org.apache.log4j.LogManager.getLogger(__name__). This ensures that PySpark logs are integrated with Spark’s JVM logging infrastructure, allowing you to track logs across the entire cluster. +#2. Task Definitions: +# Task 1: Reads data from a JSON file. +# Task 2: Applies a simple transformation (e.g., adding a constant column). +# Task 3: Performs aggregation (e.g., counting rows by a specific column). +# Task 4: Saves the resulting DataFrame back to a PARQUET file. +#3. Logging at Task Completion: +# For each task, we log a message before starting the task (e.g., "Task 1 started: Reading JSON data.") and upon task completion (e.g., "Task 1 completed: JSON data loaded successfully."). +# These log messages provide insight into the task orchestration and allow you to trace the execution flow in Spark’s logs. +#4. Error Handling: +# The script uses a try-except-finally block to ensure that any errors during task execution are caught and logged. The exc_info=True argument in the logger captures the full stack trace in the log. +# If an error occurs, the script logs the exception and continues to the finally block where it ensures the Spark session is stopped. +#Running the Script on a Spark Cluster: +# Submit the Script: You can submit this script to a Spark cluster using the spark-submit command: +# bash +# spark-submit --master path_to_script.py +# Check Logs: Log messages generated by the Log4j logger will appear in the Spark application logs, accessible via: +# executor/driver logs +# This allows you to track the execution of each task and detect potential issues. +#Benefits of Using Log4j in PySpark: +# Cluster-Wide Logging: Logs are available from both the driver and executors, ensuring you can trace logs from distributed tasks. +# Integrated Monitoring: If your cluster uses a logging aggregation system, the logs will be centralized, aiding debugging and monitoring efforts. + + +import argparse +import os +import logging +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession, SQLContext +from pyspark.sql import functions as F + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('-p', '--profile_name', + help='OCI Profile', required=False) + parser.add_argument('-i', '--input-path', + help='Input JSON file or file path', required=True) + parser.add_argument('-o', '--output-path', + help='Output PARQUET file path', required=True) + + args = parser.parse_args() + input_path = args.input_path + output_path = args.output_path + # Set up Spark. + spark = get_dataflow_spark_session() + sql_context = SQLContext(spark) + +# Initialize Log4j Logger from SparkContext + log4j_logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__) + log4j_logger.info("PySpark Application started.") + + +# Define multiple tasks (Example: Task 1 - Reading a CSV, Task 2 - Data Transformation, Task 3 - Data Aggregation) + def task1(input_path): + log4j_logger.info("Task 1 started: Reading JSON data.") + df = sql_context.read.json(input_path) + log4j_logger.info("Task 1 completed: JSON data loaded successfully.") + return df + + def task2(df): + log4j_logger.info("Task 2 started: Transforming data.") + # Example Transformation: Add a new column with a constant value + transformed_df = df.withColumn("new_column", F.lit("constant_value")) + log4j_logger.info("Task 2 completed: Data transformation done.") + return transformed_df + + def task3(df): + log4j_logger.info("Task 3 started: Aggregating data.") + # Example Aggregation: Count number of rows for each value in a column + aggregated_df = df.groupBy("new_column").count() + log4j_logger.info("Task 3 completed: Data aggregation done.") + return aggregated_df + + def task4(df, output_path): + log4j_logger.info("Task 4 started: Saving data to file.") + # Save the DataFrame to a PARQUET file + df.write.mode("overwrite").parquet(output_path) + log4j_logger.info("Task 4 completed: Data saved successfully.") + +# Orchestrate the tasks + try: + log4j_logger.info("Starting the orchestration of tasks.") + + # Execute Task 1: Read JSON + df = task1(input_path) + + # Execute Task 2: Data Transformation + transformed_df = task2(df) + + # Execute Task 3: Data Aggregation + aggregated_df = task3(transformed_df) + + # Execute Task 4: Save Data + task4(aggregated_df, output_path) + + log4j_logger.info("All tasks completed successfully.") + + except Exception as e: + log4j_logger.error(f"An error occurred: {e}", exc_info=True) + + finally: + # Stop Spark session + spark.stop() + log4j_logger.info("Spark session stopped.") + +def get_dataflow_spark_session( + app_name="DataFlow_JSON2Parquet", file_location=None, profile_name=None, spark_config={} +): + """ + Get a Spark session in a way that supports running locally or in Data Flow. + """ + if in_dataflow(): + spark_builder = SparkSession.builder.appName(app_name) + else: + # Import OCI. + try: + import oci + except: + raise Exception( + "You need to install the OCI python library to test locally" + ) + + # Use defaults for anything unset. + if file_location is None: + file_location = oci.config.DEFAULT_LOCATION + if profile_name is None: + profile_name = oci.config.DEFAULT_PROFILE + + # Load the config file. + try: + oci_config = oci.config.from_file( + file_location=file_location, profile_name=profile_name + ) + except oci.exceptions.ConfigFileNotFound as e: + print( + "OCI config file not found. Please ensure the file exists and is accessible.") + raise e + except oci.exceptions.InvalidConfig as e: + print("Invalid OCI config. Please check your configuration settings.") + raise e + except Exception as e: + print("An unexpected error occurred.") + raise e + conf = SparkConf() + conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"]) + conf.set("fs.oci.client.auth.userId", oci_config["user"]) + conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"]) + conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"]) + conf.set( + "fs.oci.client.hostname", + "https://objectstorage.{0}.oraclecloud.com".format( + oci_config["region"]), + ) + conf.set("fs.oci.client.apache.connection.closing.strategy", + "immediate") # Large Files with partial reads + spark_builder = SparkSession.builder.appName( + app_name).config(conf=conf) + + # Add in extra configuration. + for key, val in spark_config.items(): + spark_builder.config(key, val) + + # Create the Spark session. + session = spark_builder.getOrCreate() + return session + + +def in_dataflow(): + """ + Determine if we are running in OCI Data Flow by checking the environment. + """ + return os.environ.get("HOME") == "/home/dataflow" + +if __name__ == "__main__": + main()