Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Substituting pandas operations with direct Elasticsearch search queries using the Python Elasticsearch Client #48

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 92 additions & 91 deletions generate_homepage_xml.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import random
import pandas as pd
import time
import traceback
from datetime import datetime, timedelta
Expand All @@ -8,6 +7,7 @@
import sys
import warnings
import json
from tqdm import tqdm

from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient
Expand All @@ -21,25 +21,29 @@
def page_data_handling(data_list: list, get_unique_per_dev=False):
page_data = []
collected_dev_data = []
for data in data_list:
individual_file_exists, combined_file_exists = gen.check_local_xml_files_exists(data,
look_for_combined_summary_file=True)
if not individual_file_exists:
this_doc_id = data['_source']['id']
logger.info(f"individual summary file does not exist for id: {this_doc_id}")
this_doc_data = elastic_search.fetch_data_based_on_id(es_index=ES_INDEX, id_str=this_doc_id)
logger.info(f"Total docs found: {len(this_doc_data)}")
xml_gen.start(dict_data=this_doc_data, url=data['_source']['domain'])
logger.info(f"xml generation complete")
entry_data = gen.create_single_entry(data, look_for_combined_summary=True)

if get_unique_per_dev:
if entry_data['dev_name'] not in collected_dev_data:
collected_dev_data.append(entry_data['dev_name'])
logger.info(f"collected data for: {collected_dev_data}")
for data in tqdm(data_list):
try:
individual_file_exists, combined_file_exists = gen.check_local_xml_files_exists(data,
look_for_combined_summary_file=True)
if not individual_file_exists:
this_doc_id = data['_source']['id']
logger.info(f"individual summary file does not exist for id: {this_doc_id}")
this_doc_data = elastic_search.fetch_data_based_on_id(es_index=ES_INDEX, id_str=this_doc_id)
logger.info(f"Total docs found: {len(this_doc_data)}")
xml_gen.start(dict_data=this_doc_data, url=data['_source']['domain'])
logger.info(f"xml generation complete")
entry_data = gen.create_single_entry(data, look_for_combined_summary=True)

if get_unique_per_dev:
if entry_data['dev_name'] not in collected_dev_data:
collected_dev_data.append(entry_data['dev_name'])
logger.info(f"collected data for: {collected_dev_data}")
page_data.append(entry_data)
else:
page_data.append(entry_data)
else:
page_data.append(entry_data)
except Exception as ex:
logger.error(
f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}")
return page_data


Expand Down Expand Up @@ -78,7 +82,6 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):
logger.info(f"Working on URL: {dev_url}")
fetch_today_in_history = True

all_data_df, all_data_list = elastic_search.fetch_all_data_for_url(ES_INDEX, url=dev_url)
data_list = elastic_search.extract_data_from_es(
ES_INDEX, dev_url, start_date_str, current_date_str, exclude_combined_summary_docs=True
)
Expand All @@ -87,9 +90,10 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):

seen_titles = set()

# top active posts
active_posts_data = elastic_search.filter_top_active_posts(es_results=data_list, top_n=10,
all_data_df=all_data_df)
# TOP ACTIVE POSTS
active_posts_data = elastic_search.filter_top_active_posts(
es_results=data_list, top_n=10
)

active_posts_data_counter = 0
for data in active_posts_data:
Expand All @@ -102,33 +106,31 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):
seen_titles.add(title)

# get the first post's info of this title
df_title = all_data_df.loc[(all_data_df['title'] == title) & (all_data_df['domain'] == dev_url)]
df_title.sort_values(by='created_at', inplace=True)
original_post = df_title.iloc[0].to_dict()

counts, contributors = elastic_search.fetch_contributors_and_threads(title=title, domain=dev_url,
df=df_title)

for i in all_data_list:
if i['_source']['title'] == original_post['title'] and i['_source']['domain'] == original_post[
'domain'] and i['_source']['authors'] == original_post['authors'] and i['_source']['created_at'] == \
original_post['created_at'] and i['_source']['url'] == original_post['url']:
for author in i['_source']['authors']:
contributors.remove(author)
i['_source']['n_threads'] = counts
i['_source']['contributors'] = contributors
i['_source']['dev_name'] = dev_name
active_data_list.append(i)
active_posts_data_counter += 1
break
original_post = elastic_search.get_earliest_posts_by_title(
es_index=ES_INDEX, url=dev_url, title=title
)

counts, contributors = elastic_search.es_fetch_contributors_and_threads(
es_index=ES_INDEX, title=title, domain=dev_url
)

# if you want to show the first post of each selected title,
# then do the below operations on - 'original_post', else on 'data'
for author in original_post['_source']['authors']:
contributors.remove(author)
original_post['_source']['n_threads'] = counts
original_post['_source']['contributors'] = contributors
original_post['_source']['dev_name'] = dev_name
active_data_list.append(original_post)
active_posts_data_counter += 1

logger.success(f"Number of active posts collected: {len(active_data_list)}, for URL: {dev_url}")

# top recent posts
# TOP RECENT POSTS
recent_data_post_counter = 0
recent_posts_data = elastic_search.filter_top_recent_posts(es_results=data_list, top_n=20)
for data in recent_posts_data:

for data in recent_posts_data:
# if preprocess body text not longer than token_threshold, skip that post
if not gen.is_body_text_long(data=data, sent_threshold=2):
logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}")
Expand All @@ -140,10 +142,12 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):
seen_titles.add(title)
if recent_data_post_counter >= 3:
break
counts, contributors = elastic_search.fetch_contributors_and_threads(title=title, domain=dev_url,
df=all_data_df)
authors = data['_source']['authors']
for author in authors:

counts, contributors = elastic_search.es_fetch_contributors_and_threads(
es_index=ES_INDEX, title=title, domain=dev_url
)

for author in data['_source']['authors']:
contributors.remove(author)
data['_source']['n_threads'] = counts
data['_source']['contributors'] = contributors
Expand All @@ -159,13 +163,12 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):
continue

title = data['_source']['title']
seen_titles.add(title)
if recent_data_post_counter >= 3:
break
counts, contributors = elastic_search.fetch_contributors_and_threads(title=title, domain=dev_url,
df=all_data_df)
authors = data['_source']['authors']
for author in authors:
counts, contributors = elastic_search.es_fetch_contributors_and_threads(
es_index=ES_INDEX, title=title, domain=dev_url
)
for author in data['_source']['authors']:
contributors.remove(author)
data['_source']['n_threads'] = counts
data['_source']['contributors'] = contributors
Expand All @@ -175,7 +178,7 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):

logger.success(f"Number of recent posts collected: {len(recent_data_list)}, for URL: {dev_url}")

# today in history posts
# TODAY IN HISTORY POSTS
logger.info(f"fetching 'Today in history' posts... ")

if not random_years_ago:
Expand All @@ -191,54 +194,52 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):

default_days_to_look_back = 6
loop_counter = 1
docs_counter = 0

while fetch_today_in_history:
days_to_look_back = default_days_to_look_back * loop_counter
selected_random_date = current_date - timedelta(days=365 * random_years_ago)

start_of_time = selected_random_date - timedelta(days=selected_random_date.weekday())
end_of_time = start_of_time + timedelta(days=days_to_look_back)
logger.info(f"collecting the data for {days_to_look_back} days ... ::: Start of week: {start_of_time}, "

start_of_time_str = start_of_time.strftime("%Y-%m-%dT%H:%M:%S")
end_of_time_str = end_of_time.strftime("%Y-%m-%dT%H:%M:%S")

logger.info(f"collecting the data from {days_to_look_back} days range ... || Start of week: {start_of_time} | "
f"End of week: {end_of_time}")

all_data_df['created_at'] = pd.to_datetime(all_data_df['created_at'], errors='coerce')
all_data_df['created_at_'] = all_data_df['created_at'].dt.tz_convert(None)
df_selected_threads = all_data_df.loc[(all_data_df['created_at_'] >= start_of_time) &
(all_data_df['created_at_'] <= end_of_time)]
logger.info(f"Shape of df_selected_threads: {df_selected_threads.shape}")

if len(df_selected_threads) > 0:
for idx, row in df_selected_threads.iterrows():
this_title = row['title']
this_created_at = row['created_at'].to_pydatetime().replace(tzinfo=None)
this_type = row['type']

if this_type == 'original_post':
logger.info(f"collecting an original thread created at {this_created_at}")
df_title = all_data_df.loc[
(all_data_df['title'] == this_title) & (all_data_df['domain'] == dev_url)]
df_title.sort_values(by='created_at', inplace=True)
logger.info(f"No. of posts found for title: '{this_title}' ::: {df_title.shape}")

counts, contributors = elastic_search.fetch_contributors_and_threads(title=this_title,
domain=dev_url,
df=df_title)
selected_threads = elastic_search.fetch_data_in_date_range(
es_index=ES_INDEX,
start_date=start_of_time_str,
end_date=end_of_time_str,
domain=dev_url
)

if len(selected_threads) > 0:
for doc in selected_threads:
doc_title = doc['_source']['title']
doc_created_at = doc['_source']['created_at']

if doc['_source']['type'] == 'original_post':

counts, contributors = elastic_search.es_fetch_contributors_and_threads(
es_index=ES_INDEX, title=doc_title, domain=dev_url
)

if counts < 5:
logger.info(f"No. of replies are less than 5, skipping it... ")
continue

for d in all_data_list:
if d['_source']['title'] == this_title and d['_source']['domain'] == row['domain'] and \
d['_source']['authors'] == row['authors'] and d['_source']['url'] == row['url']:
if contributors:
for author in d['_source']['authors']:
contributors.remove(author)
d['_source']['n_threads'] = counts
d['_source']['contributors'] = contributors
d['_source']['dev_name'] = dev_name
today_in_history_data_list.append(d)
fetch_today_in_history = False
break
if contributors:
for author in doc['_source']['authors']:
contributors.remove(author)
doc['_source']['n_threads'] = counts
doc['_source']['contributors'] = contributors
doc['_source']['dev_name'] = dev_name
today_in_history_data_list.append(doc)
logger.info(f"collected doc created on: {doc_created_at} || TITLE: {doc_title}")
fetch_today_in_history = False
break
loop_counter += 1

# add history data from yesterday's homepage.json
Expand Down Expand Up @@ -328,7 +329,7 @@ def page_data_handling(data_list: list, get_unique_per_dev=False):
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
time.sleep(delay)
count += 1
if count > 3:
if count > 1:
sys.exit(f"{ex}")
else:
logger.info("No change in 'Recent' or 'Active' posts.")
Expand Down
Loading