From 66123a93e6d609807a48cbef40520fa4a1b2c3f2 Mon Sep 17 00:00:00 2001 From: Zihan Xiao <98075925+zihanxiao23@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:32:43 -0500 Subject: [PATCH] Update app.py --- src/app.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/app.py b/src/app.py index 7a5aaf7..f2fa9fc 100644 --- a/src/app.py +++ b/src/app.py @@ -1,34 +1,55 @@ +import logging from flask import Flask, request, jsonify from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg, count +# Initialize Flask app app = Flask(__name__) # Initialize SparkSession spark = SparkSession.builder.appName("microservice").getOrCreate() -# Reduce log verbosity +# Reduce Spark log verbosity spark.sparkContext.setLogLevel("WARN") +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler() # Log to console + ] +) +logger = logging.getLogger(__name__) + @app.route("/process", methods=["POST"]) def process_data(): """ Receive JSON data stream and return analysis results """ try: + logger.info("Received request to /process") data = request.get_json() - # Create a temporary DataFrame + logger.info(f"Request data: {data}") + + # Create temporary DataFrame df = spark.createDataFrame(data) + # Perform simple data analysis result = df.groupBy("gender").agg( avg("salary").alias("average_salary"), count("*").alias("count") ).collect() + # Convert results to JSON format result_json = [{"gender": row["gender"], "average_salary": row["average_salary"], "count": row["count"]} for row in result] + logger.info(f"Analysis results: {result_json}") + return jsonify(result_json), 200 except Exception as e: + logger.error(f"Error during processing: {str(e)}", exc_info=True) return jsonify({"error": str(e)}), 500 if __name__ == "__main__": + logger.info("Starting Flask app on port 5000") app.run(host="0.0.0.0", port=5000)