-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patha
50 lines (40 loc) · 1.65 KB
/
a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Import necessary libraries
import pandas as pd
import pandas_datareader as pdr
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, corr
def download_sp_data(start_date, end_date):
# Download S&P 500 data from Yahoo Finance
sp_data = pdr.get_data_yahoo('^GSPC', start=start_date, end=end_date)
return sp_data
def load_data_into_spark(sp_data, spark):
# Convert the downloaded S&P data to a Spark DataFrame
sp_df = spark.createDataFrame(sp_data.reset_index())
sp_df = sp_df.selectExpr("Date as date", "Close as sp_close")
return sp_df
def main():
# Initialize Spark Session
spark = SparkSession.builder.appName("CorrelationAnalysis").getOrCreate()
# Define the date range for the data
start_date = '2023-01-01'
end_date = datetime.now().strftime('%Y-%m-%d')
# Step 1: Load S&P 500 data
sp_data = download_sp_data(start_date, end_date)
sp_df = load_data_into_spark(sp_data, spark)
# Step 2: Dummy load for redemption data (for demonstration purposes)
# Normally, you would load this from a data source such as a CSV file
redemptions_df = spark.createDataFrame([
(1, '2023-01-01', 20),
(2, '2023-01-02', 30),
(1, '2023-01-03', 40),
], ["cust_id", "date", "MR_redemptions"])
# Step 3: Join the data on the date field
combined_df = redemptions_df.join(sp_df, "date")
# Step 4: Perform correlation analysis
correlation_result = combined_df.select(corr("sp_close", "MR_redemptions").alias("correlation"))
correlation_result.show()
# Stop the Spark session
spark.stop()
if __name__ == "__main__":
main()