Skip to content

Commit

Permalink
Update app.py
Browse files Browse the repository at this point in the history
  • Loading branch information
zihanxiao23 authored Dec 9, 2024
1 parent 8b1230a commit 66123a9
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,55 @@
import logging
from flask import Flask, request, jsonify
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count

# Initialize Flask app
app = Flask(__name__)

# Initialize SparkSession
spark = SparkSession.builder.appName("microservice").getOrCreate()

# Reduce log verbosity
# Reduce Spark log verbosity
spark.sparkContext.setLogLevel("WARN")

# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler() # Log to console
]
)
logger = logging.getLogger(__name__)

@app.route("/process", methods=["POST"])
def process_data():
"""
Receive JSON data stream and return analysis results
"""
try:
logger.info("Received request to /process")
data = request.get_json()
# Create a temporary DataFrame
logger.info(f"Request data: {data}")

# Create temporary DataFrame
df = spark.createDataFrame(data)

# Perform simple data analysis
result = df.groupBy("gender").agg(
avg("salary").alias("average_salary"),
count("*").alias("count")
).collect()

# Convert results to JSON format
result_json = [{"gender": row["gender"], "average_salary": row["average_salary"], "count": row["count"]} for row in result]
logger.info(f"Analysis results: {result_json}")

return jsonify(result_json), 200
except Exception as e:
logger.error(f"Error during processing: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500

if __name__ == "__main__":
logger.info("Starting Flask app on port 5000")
app.run(host="0.0.0.0", port=5000)

0 comments on commit 66123a9

Please sign in to comment.