diff --git a/.github/workflows/homepage_json_gen_cron_job.yml b/.github/workflows/homepage_json_gen_cron_job.yml index d41529f38..b6493212b 100644 --- a/.github/workflows/homepage_json_gen_cron_job.yml +++ b/.github/workflows/homepage_json_gen_cron_job.yml @@ -2,7 +2,7 @@ name: Daily Python Homepage Update Script Execution on: schedule: - - cron: "0 3 * * *" + - cron: "0 3 * * *" # every day at 03:00 AM UTC workflow_dispatch: repository_dispatch: diff --git a/.github/workflows/push_combined_summary_to_es_cron_job.yml b/.github/workflows/push_combined_summary_to_es_cron_job.yml index 563d32287..67865514a 100644 --- a/.github/workflows/push_combined_summary_to_es_cron_job.yml +++ b/.github/workflows/push_combined_summary_to_es_cron_job.yml @@ -2,7 +2,7 @@ name: Daily Push Combined Summary From XML Files to ES INDEX on: schedule: - - cron: "30 2 * * *" + - cron: "30 2 * * *" # every day at 02:30 AM UTC workflow_dispatch: repository_dispatch: diff --git a/.github/workflows/push_summary_to_elasticsearch_cron_job.yml b/.github/workflows/push_summary_to_elasticsearch_cron_job.yml index dd285c73a..285118d93 100644 --- a/.github/workflows/push_summary_to_elasticsearch_cron_job.yml +++ b/.github/workflows/push_summary_to_elasticsearch_cron_job.yml @@ -2,7 +2,7 @@ name: Daily Push Summary From XML Files to ES INDEX on: schedule: - - cron: "0 2 * * *" + - cron: "0 2 * * *" # every day at 02:00 AM UTC workflow_dispatch: repository_dispatch: diff --git a/.github/workflows/xmls_gen_cron_job.yml b/.github/workflows/xmls_gen_cron_job.yml index 42e9d2394..e1dc3e579 100644 --- a/.github/workflows/xmls_gen_cron_job.yml +++ b/.github/workflows/xmls_gen_cron_job.yml @@ -1,7 +1,7 @@ name: Daily XML Generation Script on: schedule: - - cron: "0 1 * * *" + - cron: "0 1 * * *" # every day at 01:00 AM UTC workflow_dispatch: repository_dispatch: diff --git a/generate_homepage_xml.py b/generate_homepage_xml.py index 2a3f017f1..ec79e966d 100644 --- a/generate_homepage_xml.py +++ b/generate_homepage_xml.py @@ -145,7 +145,7 @@ def page_data_handling(data_list: list, get_unique_per_dev=False): counts, contributors = elastic_search.es_fetch_contributors_and_threads( es_index=ES_INDEX, title=title, domain=dev_url ) - + # exclude the post authors for author in data['_source']['authors']: contributors.remove(author) data['_source']['n_threads'] = counts @@ -167,6 +167,7 @@ def page_data_handling(data_list: list, get_unique_per_dev=False): counts, contributors = elastic_search.es_fetch_contributors_and_threads( es_index=ES_INDEX, title=title, domain=dev_url ) + # exclude the post authors for author in data['_source']['authors']: contributors.remove(author) data['_source']['n_threads'] = counts @@ -230,6 +231,7 @@ def page_data_handling(data_list: list, get_unique_per_dev=False): continue if contributors: + # exclude the post authors for author in doc['_source']['authors']: contributors.remove(author) doc['_source']['n_threads'] = counts diff --git a/generate_weekly_newsletter_json.py b/generate_weekly_newsletter_json.py index bfb973e27..ce5de7781 100644 --- a/generate_weekly_newsletter_json.py +++ b/generate_weekly_newsletter_json.py @@ -57,6 +57,8 @@ logger.success(f"TOTAL THREADS RECEIVED FOR '{dev_name}': {len(data_list)}") # NEW THREADS POSTS + # @TODO you already identify the original post by type==original_post + # so you could get the posts in order by date and check if the original posts is there seen_titles = set() for i in data_list: this_title = i['_source']['title'] diff --git a/src/elasticsearch_utils.py b/src/elasticsearch_utils.py index 68706d0bc..c45eb0bd5 100644 --- a/src/elasticsearch_utils.py +++ b/src/elasticsearch_utils.py @@ -127,12 +127,20 @@ def fetch_data_based_on_title(self, es_index, title, url): def extract_data_from_es(self, es_index, url, start_date_str, current_date_str, exclude_combined_summary_docs=False): + """ + Fetches and extracts documents from a specified Elasticsearch index based on URL, + date range, and an optional exclusion flag for combined summary documents. + The method returns a list of documents that match the query criteria. + """ output_list = [] start_time = time.time() if self._es_client.ping(): logger.success("connected to the ElasticSearch") + # Construct a search query to filter documents by domain, + # date range (start to end date) and optionally exclude + # 'combined-summary' documents domain_query = self.get_domain_query(url) if exclude_combined_summary_docs: @@ -228,7 +236,7 @@ def filter_top_recent_posts(self, es_results, top_n): def filter_top_active_posts(self, es_results, top_n): unique_results = [] - thread_dict = {} + thread_dict = {} # maps post titles to their respective activity levels # create dictionary with title as key and thread count as value for result in es_results: title = result['_source']['title'] @@ -238,6 +246,7 @@ def filter_top_active_posts(self, es_results, top_n): domain=result['_source']['domain'] ) result['_source']['n_threads'] = counts + # exclude the post authors for author in result['_source']['authors']: contributors.remove(author) result['_source']['n_threads'] = counts @@ -246,7 +255,8 @@ def filter_top_active_posts(self, es_results, top_n): # add counts as value to thread_dict with a key as title thread_dict[title] = counts - # Use the dictionary created above, to sort the results + # Use the dictionary created above to sort the results + # posts with a higher thread count are placed at the top es_results_sorted = sorted( es_results, key=lambda x: thread_dict[x['_source']['title']], reverse=True @@ -370,9 +380,13 @@ def get_earliest_posts_by_title(self, es_index, url, title): return earliest_post def es_fetch_contributors_and_threads(self, es_index, title, domain): + """ + Fetches the count of threads and unique contributors for a given post based on title and domain + """ + # The search query domain_query = self.get_domain_query(domain) query = { - "size": 0, + "size": 0, # no search hits are returned, the focus is solely on the aggregations and counts "query": { "bool": { "must": [ @@ -381,6 +395,7 @@ def es_fetch_contributors_and_threads(self, es_index, title, domain): ] } }, + # count unique authors associated with the matching documents "aggs": { "authors_list": { "terms": { @@ -394,6 +409,7 @@ def es_fetch_contributors_and_threads(self, es_index, title, domain): response = self._es_client.search(index=es_index, body=query) counts = response['hits']['total']['value'] if int(counts) > 0: + # exclude original post counts = int(counts) - 1 contributors = [author['key'] for author in response['aggregations']['authors_list']['buckets']] return counts, contributors @@ -427,7 +443,7 @@ def fetch_data_in_date_range(self, es_index, start_date, end_date, domain): return selected_threads def fetch_data_with_empty_summary(self, es_index, url=None, start_date_str=None, current_date_str=None): - logger.info(f"connecting ElasticSearch to fetch the docs with summary ... ") + logger.info(f"connecting ElasticSearch to fetch the docs with no summary ... ") output_list = [] start_time = time.time() diff --git a/src/xml_utils.py b/src/xml_utils.py index fa1e157a0..cf4fe58fe 100644 --- a/src/xml_utils.py +++ b/src/xml_utils.py @@ -134,6 +134,12 @@ def generate_xml(self, feed_data, xml_file): f.write(feed_xml) def append_columns(self, df_dict, file, title, namespace): + """ + Extract specific information from the given XML file corresponding to + a single post or reply within a thread and append this information to + the given dictionary (df_dict) + """ + # Append default values for columns that will not be directly filled from the XML df_dict["body_type"].append(0) df_dict["id"].append(file.split("/")[-1].split("_")[0]) df_dict["type"].append(0) @@ -141,21 +147,26 @@ def append_columns(self, df_dict, file, title, namespace): df_dict["_id"].append(0) df_dict["_score"].append(0) + # The title is directly provided as a parameter df_dict["title"].append(title) # formatted_file_name = file.split("/static")[1] # logger.info(formatted_file_name) + # Parse the XML file to extract and append relevant data tree = ET.parse(file) root = tree.getroot() + # Extract and format the publication date date = root.find('atom:entry/atom:published', namespace).text datetime_obj = add_utc_if_not_present(date, iso_format=False) df_dict["created_at"].append(datetime_obj) + # Extract the URL from the 'link' element link_element = root.find('atom:entry/atom:link', namespace) link_href = link_element.get('href') df_dict["url"].append(link_href) + # Process and append the author's name, removing digits and unnecessary characters author = root.find('atom:author/atom:name', namespace).text author_result = re.sub(r"\d", "", author) author_result = author_result.replace(":", "") @@ -164,6 +175,12 @@ def append_columns(self, df_dict, file, title, namespace): def file_not_present_df(self, columns, source_cols, df_dict, files_list, dict_data, data, title, combined_filename, namespace): + """ + Processes data directly from the given document (`data`) as no XML summary is + available for that document. Also, for each individual summary (XML file) that + already exists for the given thread, extracts and appends its content to the dictionary. + """ + # Append basic data from dict_data for each column into df_dict for col in columns: df_dict[col].append(dict_data[data][col]) @@ -173,6 +190,14 @@ def file_not_present_df(self, columns, source_cols, df_dict, files_list, dict_da df_dict[col].append(datetime_obj) else: df_dict[col].append(dict_data[data]['_source'][col]) + + # For each individual summary (XML file) that exists for the + # given thread, extract and append their content to the dictionary + # TODO: + # This method is called for every post without a summary, which means that + # existing inidividual summaries for a thread are added n-1 times the amount + # of new posts in the thread at the time of execution of the cron job. + # this is not an issue because we then drop duplicates, but it's extra complexity. for file in files_list: file = file.replace("\\", "/") if os.path.exists(file): @@ -184,6 +209,9 @@ def file_not_present_df(self, columns, source_cols, df_dict, files_list, dict_da self.append_columns(df_dict, file, title, namespace) if combined_filename in file: + # TODO: the code will never reach this point + # as we are already filtering per thread title so no + # "Combined summary - X" filename will pass though tree = ET.parse(file) root = tree.getroot() summary = root.find('atom:entry/atom:summary', namespace).text @@ -194,30 +222,48 @@ def file_not_present_df(self, columns, source_cols, df_dict, files_list, dict_da else: logger.info(f"file not present: {file}") - def file_present_df(self, files_list, namespace, combined_filename, title, xmls_list, df_dict): - combined_file_fullpath = None + def file_present_df(self, files_list, namespace, combined_filename, title, individual_summaries_xmls_list, df_dict): + """ + Iterates through the list of XML files, identifying the combined XML file and + individual summaries relevant to the given thread (as specified by title). + It copies the combined XML file to all relevant month folders. If no combined + summary exists, it extracts the content of individual summaries, appending it + to the data dictionary. + """ + combined_file_fullpath = None # the combined XML file if found + # List to keep track of the month folders that contain + # the XML files for the posts of the current thread month_folders = [] + + # Iterate through the list of local XML file paths for file in files_list: file = file.replace("\\", "/") + # Check if the current file is the combined XML file for the thread if combined_filename in file: combined_file_fullpath = file + # Parse the XML file to find the title and compare it with the current title + # in order to understand if the post/file is part of the current thread tree = ET.parse(file) root = tree.getroot() file_title = root.find('atom:entry/atom:title', namespace).text + # If titles match, add the file to the list of relevant XMLs and track its month folder if title == file_title: - xmls_list.append(file) + individual_summaries_xmls_list.append(file) month_folder_path = "/".join(file.split("/")[:-1]) if month_folder_path not in month_folders: month_folders.append(month_folder_path) + # Ensure the combined XML file is copied to all relevant month folders for month_folder in month_folders: if combined_file_fullpath and combined_filename not in os.listdir(month_folder): if combined_filename not in os.listdir(month_folder): shutil.copy(combined_file_fullpath, month_folder) - if len(xmls_list) > 0 and not any(combined_filename in item for item in files_list): + # If individual summaries exist but no combined summary, + # extract and append their content to the dictionary + if len(individual_summaries_xmls_list) > 0 and not any(combined_filename in item for item in files_list): logger.info("individual summaries are present but not combined ones ...") - for file in xmls_list: + for file in individual_summaries_xmls_list: self.append_columns(df_dict, file, title, namespace) tree = ET.parse(file) root = tree.getroot() @@ -229,34 +275,46 @@ def preprocess_authors_name(self, author_tuple): return author_tuple def get_local_xml_file_paths(self, dev_url): + """ + Retrieve paths for all relevant local XML files based on the given domain + """ current_directory = os.getcwd() directory = get_base_directory(dev_url) files_list = glob.glob(os.path.join(current_directory, "static", directory, "**/*.xml"), recursive=True) return files_list def generate_new_emails_df(self, main_dict_data, dev_url): + # Define XML namespace for parsing XML files namespaces = {'atom': 'http://www.w3.org/2005/Atom'} - - # get a locally stored xml files path + + # Retrieve all existing XML files (summaries) for the given source files_list = self.get_local_xml_file_paths(dev_url) + # Initialize a dictionary to store data for DataFrame construction, with predefined columns columns = ['_index', '_id', '_score'] source_cols = ['body_type', 'created_at', 'id', 'title', 'body', 'type', 'url', 'authors'] df_dict = {col: [] for col in (columns + source_cols)} seen_titles = set() + # Process each document in the input data for idx in range(len(main_dict_data)): - xmls_list = [] - this_title = main_dict_data[idx]["_source"]["title"] - if this_title in seen_titles: + xmls_list = [] # the existing XML files for the thread that the fetched document is part of + thread_title = main_dict_data[idx]["_source"]["title"] + if thread_title in seen_titles: continue - # generate xml for all the docs that fall under this title + # `files_list` contains all existing XML files for the current thread + # but older threads might lack corresponding XML files if they were + # inactive when we began creating summaries. When such threads become + # active, XML files for their posts/docs are absent. + # To address this, we fetch all documents under the active thread to + # prepare for XML generation in subsequent processing steps. title_dict_data = elastic_search.fetch_data_based_on_title( - es_index=ES_INDEX, title=this_title, url=dev_url + es_index=ES_INDEX, title=thread_title, url=dev_url ) for data_idx in range(len(title_dict_data)): + # Extract relevant identifiers and metadata from the document title = title_dict_data[data_idx]["_source"]["title"] number = get_id(title_dict_data[data_idx]["_source"]["id"]) xml_name = clean_title(title) @@ -264,6 +322,7 @@ def generate_new_emails_df(self, main_dict_data, dev_url): combined_filename = f"combined_{xml_name}.xml" created_at = title_dict_data[data_idx]["_source"]["created_at"] + # Check if the XML file for the document exists if not any(file_name in item for item in files_list): logger.info(f"Not present: {created_at} | {file_name}") self.file_not_present_df(columns, source_cols, df_dict, files_list, title_dict_data, data_idx, @@ -272,9 +331,11 @@ def generate_new_emails_df(self, main_dict_data, dev_url): logger.info(f"Present: {created_at} | {file_name}") self.file_present_df(files_list, namespaces, combined_filename, title, xmls_list, df_dict) - seen_titles.add(this_title) + seen_titles.add(thread_title) + # Convert the dictionary to a pandas DataFrame for structured data representation emails_df = pd.DataFrame(df_dict) + # Clean and preprocess fields in the DataFrame emails_df['authors'] = emails_df['authors'].apply(convert_to_tuple) emails_df = emails_df.drop_duplicates() emails_df['authors'] = emails_df['authors'].apply(self.preprocess_authors_name) @@ -307,12 +368,13 @@ def generate_local_xml(cols, combine_flag, url): # construct a file path file_path = f"{dir_path}/{number}_{xml_name}.xml" - # check if the file exists + # Check if we already created a summary for this post in the past if os.path.exists(file_path): + # if XML file exists, we already created a summary logger.info(f"Exist: {file_path}") return fr"{directory}/{str_month_year}/{number}_{xml_name}.xml" - # create file if not exist + # No summary was found, we need to create one logger.info(f"Not found: {file_path}") summary = create_summary(cols['body']) feed_data = { @@ -330,10 +392,11 @@ def generate_local_xml(cols, combine_flag, url): os_name = platform.system() # logger.info(f"Operating System: {os_name}") - # get unique titles + # Identify threads by getting unique titles across posts titles = emails_df.sort_values('created_at')['title'].unique() logger.info(f"Total titles in data: {len(titles)}") for title_idx, title in tqdm(enumerate(titles)): + # Filter emails by title and prepare them for XML generation title_df = emails_df[emails_df['title'] == title] title_df['authors'] = title_df['authors'].apply(convert_to_tuple) title_df = title_df.drop_duplicates() @@ -341,16 +404,23 @@ def generate_local_xml(cols, combine_flag, url): title_df = title_df.sort_values(by='created_at', ascending=False) logger.info(f"Number of docs for title: {title}: {len(title_df)}") + # Handle threads with single and multiple documents differently if len(title_df) < 1: continue if len(title_df) == 1: + # Don't create combined summary for threads with no replies generate_local_xml(title_df.iloc[0], "0", url) continue + # COMBINED SUMMARY GENERATION + # Combine the individual posts of the thread into combined_body combined_body = '\n\n'.join(title_df['body'].apply(str)) xml_name = clean_title(title) + # Generate XML files (if not exist) for each post in the thread, collecting their paths into combined_links combined_links = list(title_df.apply(generate_local_xml, args=("1", url), axis=1)) + # Generate a list of strings, each combining the first author's name with their post's creation date combined_authors = list( title_df.apply(lambda x: f"{x['authors'][0]} {x['created_at']}", axis=1)) + # Group emails by month and year based on their creation date to process them in time-based segments month_year_group = \ title_df.groupby([title_df['created_at'].dt.month, title_df['created_at'].dt.year]) @@ -363,7 +433,9 @@ def generate_local_xml(cols, combine_flag, url): directory = get_base_directory(url) file_path = fr"static/{directory}/{str_month_year}/combined_{xml_name}.xml" - + # Generate a single combined thread summary using: + # - the individual summaries of previous posts + # - the actual content of newer posts combined_summary = create_summary(combined_body) feed_data = { 'id': "2", @@ -374,11 +446,16 @@ def generate_local_xml(cols, combine_flag, url): 'created_at': add_utc_if_not_present(title_df.iloc[0]['created_at_org']), 'summary': combined_summary } + # We use a flag to check if the XML file for the + # combined summary is generated for the first time if not flag: + # Generate XML only once for the first month-year and keep its path self.generate_xml(feed_data, file_path) std_file_path = file_path flag = True else: + # For subsequent month-year groups, copy the initially + # created XML file instead of creating a new one if os_name == "Windows": shutil.copy(std_file_path, file_path) elif os_name == "Linux":