-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBgGpt_main_v2.py
175 lines (155 loc) · 7.83 KB
/
BgGpt_main_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#TODO: Separate fetch news into several functions, it is doing too much. One function that randonmly selects a user agents and it gets passed in as a parameter to fetch news, save the response as html. Then another function to parse it, convert it into soup, then another function to save the articles to the database.
#To refactor the code according to your request, we'll break down the `display_articles` function into smaller, more manageable functions. This will make the code more modular and easier to maintain. We'll also ensure that the main entry point (`__main__`) calls these functions in the correct sequence.
#Here's the refactored code:
#python
import requests
from bs4 import BeautifulSoup
import streamlit as st
import pandas as pd
import sqlite3
import datetime
import os
import re # Import regex
import random
from random import randint
import nltk
from nltk.tokenize import word_tokenize, sent_tokenize
# Download necessary NLTK resources
nltk.download('punkt')
# Custom Bulgarian stopwords list
bulgarian_stopwords = set([
"и", "в", "на", "с", "за", "не"
])
# Constants
BASE_URL = "https://www.mediapool.bg/"
DATABASE_NAME = 'articles.db'
FOLDER_NAME = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
os.makedirs(FOLDER_NAME, exist_ok=True)
def sanitize_title(title):
return re.sub(r'[\\/*?:"<>|]', '', title)[:50]
def setup_database():
with sqlite3.connect(DATABASE_NAME) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
link TEXT NOT NULL,
summary TEXT,
views INTEGER DEFAULT 0,
retrieval_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
import random
def fetch_news(articles_folderpath):
# Define the user agents
user_agents = {
'firefox': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0',
'chrome': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36',
'edge': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.902.78 Safari/537.36 Edg/92.0.902.78',
'safari': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15',
'opera': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36 OPR/64.0.3417.83'
}
# Select a random user agent
browser = random.choice(list(user_agents.keys()))
headers = {'User-Agent': user_agents[browser]}
# Set up path to save each article to file
articles_folderpath = os.getcwd() + '\\' + articles_folderpath
file2save = 'response.html'
article_filepath = articles_folderpath + '\\' + file2save
# Request the news page
try:
response = requests.get(BASE_URL, headers=headers)
if response.status_code == 200:
print (f"response.status_code: {response.status_code}")
# Save the response content as HTML
save_html(response, article_filepath)
except Exception as e:
print(f"Error fetching news: {e}")
return response
#In this version of the function, a random user agent is selected from the user_agents dictionary. The browser variable is not passed as an argument to the function, but is defined within the function. This should resolve the TypeError you were encountering.
def save_html(response, filepath):
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(response.text)
print (f"#response.html successfully written out")
except Exception as e:
print(f"Error writing HTML file: {e}")
def parse_and_save_soup(response, soup_filepath):
try:
soup = BeautifulSoup(response.text, 'html.parser')
with open(soup_filepath, 'w', encoding='utf-8') as f:
f.write(soup.get_text())
print (f"soup content written out from soup.get_text(): {soup.get_text()}")
return soup
except Exception as e:
print(f"Error writing soup file: {e}")
return None
def save_articles_to_db(soup):
try:
news_items = soup.find_all('article', limit=10)
with sqlite3.connect(DATABASE_NAME) as conn:
conn.execute("DELETE FROM articles WHERE date(retrieval_date) = date('now')")
for item in news_items:
a_tag = item.find('a')
if a_tag and a_tag['href']:
title, summary = extract_title_and_summary(a_tag['href'])
views = randint(1, 100)
conn.execute("INSERT INTO articles (title, link, summary, views, retrieval_date) VALUES (?, ?, ?, ?, ?)",
(title, a_tag['href'], summary, views, datetime.datetime.now()))
conn.commit()
except requests.RequestException as e:
st.error(f"Exception during raw data news_fetch_and_save article_file: {e}")
def extract_title_and_summary(url):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
response = requests.get(url, headers=headers)
if response.ok:
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find('title').text if soup.find('title') else "No title found"
article_text = ' '.join([p.text for p in soup.find_all('p')])
sentences = sent_tokenize(article_text)
summary = sentences[0][:100] if sentences else "No summary available"
tokens = word_tokenize(article_text) # Tokenize the article text
return title, summary
except Exception as e:
return "Failed to extract", str(e)
def get_popular_articles(selected_date):
with sqlite3.connect(DATABASE_NAME) as conn:
return pd.read_sql("""
SELECT title, link, summary
FROM articles
WHERE date(retrieval_date) = date(?)
ORDER BY views DESC
LIMIT 5
""", conn, params=(selected_date,))
def display_articles():
st.title("TOP 10 News Summary from MediaPool")
setup_database()
selected_date = st.sidebar.date_input("Select a date", datetime.date.today())
if st.sidebar.button("Fetch News Now"):
articles_folderpath = 'articles'
soup_filepath = 'response_soup.txt'
response = fetch_news(articles_folderpath)
if response is not None:
soup = parse_and_save_soup(response, soup_filepath)
if soup is not None:
save_articles_to_db(soup)
popular_articles = get_popular_articles(selected_date)
if not popular_articles.empty:
st.sidebar.write(f"Most Popular Articles for {selected_date}:")
for index, row in popular_articles.iterrows():
st.sidebar.write(f"{row['title']} - {row['link']}")
# Display today's articles
with sqlite3.connect(DATABASE_NAME) as conn:
todays_articles = pd.read_sql("SELECT title, link, summary FROM articles WHERE date(retrieval_date) = date('now')", conn)
if not todays_articles.empty:
todays_articles.index = range(1, len(todays_articles) + 1)
for _, row in todays_articles.iterrows():
with st.expander(f"{row['title']}"):
st.write(f"Накратко: {row['summary']}")
st.write(f"Прочетете цялата статия: [link]({row['link']})")
else:
st.write("Не са намерени статии.")
if __name__ == "__main__":
display_articles()
#This refactored code maintains the original functionality but organizes the code into smaller, more manageable functions. The main entry point (`__main__`) calls the `display_articles` function, which in turn calls other functions as needed. This structure makes the code easier to understand and maintain.