Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store logs in csv file #57

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
566 changes: 293 additions & 273 deletions generate_homepage_xml.py

Large diffs are not rendered by default.

397 changes: 208 additions & 189 deletions generate_weekly_newsletter_json.py

Large diffs are not rendered by default.

113 changes: 71 additions & 42 deletions push_combined_summary_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,95 @@
from loguru import logger
import glob
import os

from collections import defaultdict
from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient
from src.xml_utils import XMLReader
from src.utils import remove_timestamps_from_author_names
from src.utils import remove_timestamps_from_author_names, log_csv

if __name__ == "__main__":

REMOVE_TIMESTAMPS_IN_AUTHORS = True
inserted_count = defaultdict(set)
updated_count = defaultdict(set)
no_changes_count = defaultdict(set)
unique_urls = set()
error_occurred = False
error_message = "---"
try:
REMOVE_TIMESTAMPS_IN_AUTHORS = True

xml_reader = XMLReader()
elastic_search = ElasticSearchClient()

xml_reader = XMLReader()
elastic_search = ElasticSearchClient()
total_combined_files = []
static_dirs = [
'bitcoin-dev',
'lightning-dev',
'delvingbitcoin'
]
pattern = "combined*.xml"

total_combined_files = []
static_dirs = [
'bitcoin-dev',
'lightning-dev',
'delvingbitcoin'
]
pattern = "combined*.xml"
for static_dir in static_dirs:
combined_files = glob.glob(f"static/{static_dir}/**/{pattern}")
total_combined_files.extend(combined_files)
logger.info(f"Total combined files: {(len(total_combined_files))}")

for static_dir in static_dirs:
combined_files = glob.glob(f"static/{static_dir}/**/{pattern}")
total_combined_files.extend(combined_files)
logger.info(f"Total combined files: {(len(total_combined_files))}")
total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files}

# get unique combined file paths
total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files}
logger.info(f"Total unique combined files: {len(total_combined_files_dict)}")

logger.info(f"Total unique combined files: {len(total_combined_files_dict)}")
for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()):
try:

count_new = 0
count_updated = 0
xml_file_data = xml_reader.read_xml_file(full_path)
url = xml_file_data['domain']
unique_urls.add(url)

for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()):
try:
# get data from xml file
xml_file_data = xml_reader.read_xml_file(full_path)
if REMOVE_TIMESTAMPS_IN_AUTHORS:

if REMOVE_TIMESTAMPS_IN_AUTHORS:
# remove timestamps from author's names and collect unique names only
xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors'])
xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors'])

res = elastic_search.es_client.update(
index=ES_INDEX,
id=file_name,
body={
body = {
'doc': xml_file_data,
'doc_as_upsert': True
}
)

logger.success(f"Version-{res['_version']}, Result-{res['result']}, ID-{res['_id']}")
if res['result'] == 'created':
count_new += 1
if res['result'] == 'updated':
count_updated += 1
res = elastic_search.upsert_document(index_name=ES_INDEX,
doc_id=file_name,
doc_body=body)

if res['result'] == 'created' or res['result'] == 'updated':
updated_count[url].add(res['_id'])
elif res['result'] == 'noop':
no_changes_count[url].add(res['_id'])

except Exception as ex:
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
logger.warning(full_path)

logger.success(f"Process complete.")
logger.success(f"Error Occurred: {error_occurred}")

except Exception as ex:
error_occurred = True
error_message = str(ex)
logger.error(f"Error Occurred: {error_occurred}")
logger.error(f"Error Message: {error_message}")
logger.error(f"Process failed.")

finally:
for url in unique_urls:
log_csv(file_name='push_combined_summary_to_es',
url=url,
inserted=len(inserted_count[url]),
updated=len(updated_count[url]),
no_changes=len(no_changes_count[url]),
error=str(error_occurred),
error_log=error_message)

except Exception as ex:
error_message = f"Error occurred: {ex} \n{traceback.format_exc()}"
logger.error(error_message)
logger.info(f"URL:->{url}"
f" :: Inserted Count:->{len(inserted_count[url])}"
f" :: Updated Count:->{len(updated_count[url])}"
f" :: No Changed Count:->{len(no_changes_count[url])}.")

logger.info(f"Inserted {count_new} new documents, Updated {count_updated} documents")
logger.success("Process Complete.")
111 changes: 73 additions & 38 deletions push_summary_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient
from src.xml_utils import XMLReader

from src.utils import log_csv

if __name__ == "__main__":

Expand All @@ -21,47 +21,82 @@
"https://gnusha.org/pi/bitcoindev/"
]

for dev_url in dev_urls:
error_occurred = False
error_message = "---"

try:
url_status = {}
for dev_url in dev_urls:
url_updated_count = set()
try:
if APPLY_DATE_RANGE:
current_date_str = None
if not current_date_str:
current_date_str = datetime.now().strftime("%Y-%m-%d")
start_date = datetime.now() - timedelta(days=15)
start_date_str = start_date.strftime("%Y-%m-%d")
logger.info(f"start_date: {start_date_str}")
logger.info(f"current_date_str: {current_date_str}")
else:
start_date_str = None
current_date_str = None

if APPLY_DATE_RANGE:
current_date_str = None
if not current_date_str:
current_date_str = datetime.now().strftime("%Y-%m-%d")
start_date = datetime.now() - timedelta(days=15)
start_date_str = start_date.strftime("%Y-%m-%d")
logger.info(f"start_date: {start_date_str}")
logger.info(f"current_date_str: {current_date_str}")
else:
start_date_str = None
current_date_str = None
docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str,
current_date_str)

docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str, current_date_str)
if isinstance(dev_url, list):
dev_name = dev_url[0].split("/")[-2]
else:
dev_name = dev_url.split("/")[-2]

if isinstance(dev_url, list):
dev_name = dev_url[0].split("/")[-2]
else:
dev_name = dev_url.split("/")[-2]
logger.info(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}")

logger.success(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}")
for doc in tqdm.tqdm(docs_list):
try:
doc_id = doc['_id']
doc_index = doc['_index']
if not doc['_source'].get('summary'):
xml_summary = xml_reader.get_xml_summary(doc, dev_name)
if xml_summary:
res = elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"summary": xml_summary
}
}
)
url_updated_count.add(res['_id'])
except Exception as ex:
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
logger.warning(doc)

logger.success(f"Process complete for dev_url: {dev_url}")

for doc in tqdm.tqdm(docs_list):
res = None
try:
doc_id = doc['_id']
doc_index = doc['_index']
if not doc['_source'].get('summary'):
xml_summary = xml_reader.get_xml_summary(doc, dev_name)
if xml_summary:
elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"summary": xml_summary
}
}
)
except Exception as ex:
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
error_occurred = True
error_message = str(ex)

finally:
log_csv(
file_name='push_summary_to_elasticsearch',
url=dev_url,
updated=len(url_updated_count),
error=str(error_occurred),
error_log=error_message)

url_status[dev_url] = {
"updated_count": len(url_updated_count),
"error_occurred": error_occurred,
}

logger.success("Process complete for all domain ")
logger.info("Summary of updates for all URLs:")
for url, status in url_status.items():
logger.info(f"URL:->{url}"
f" :: Updated Count: {status['updated_count']} "
f" :: Error Occurred: {status['error_occurred']}.")

logger.success(f"Process complete.")
except Exception as ex:
logger.error(f"Error: {str(ex)}\n{traceback.format_exc()}")
53 changes: 44 additions & 9 deletions src/elasticsearch_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from datetime import datetime

from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch.helpers import scan
from loguru import logger
Expand All @@ -23,10 +24,44 @@ def __init__(self,
http_auth=(self._es_username, self._es_password),
)

def get_domain_counts(self, index_name, domain):
"""Function to get the total counts for the given 'domain' field from Elasticsearch index."""
body = {
"query": {
"term": {
"domain.keyword": domain
}
}
}

try:
resp = self.es_client.count(index=index_name, body=body)
return resp['count']
except Exception as e:
logger.error(f"Error fetching domain counts: {e}")
return None

@property
def es_client(self):
return self._es_client

def upsert_document(self, index_name, doc_id, doc_body):

script = {
"source": "ctx._source.putAll(params)",
"params": doc_body
}

request_body = {
"scripted_upsert": True,
"script": script,
"upsert": doc_body
}

# Perform the upsert operation
response = self._es_client.update(index=index_name, id=doc_id, body=request_body)
return response

def get_domain_query(self, url):
if isinstance(url, list):
domain_query = {"terms": {"domain.keyword": url}}
Expand Down Expand Up @@ -90,15 +125,15 @@ def fetch_data_based_on_title(self, es_index, title, url):
"must": [
{
"match_phrase":
{
"title.keyword": title
}
{
"title.keyword": title
}
},
{
"term":
{
"domain.keyword": str(url)
}
{
"domain.keyword": str(url)
}
}
]
}
Expand Down Expand Up @@ -236,7 +271,7 @@ def filter_top_recent_posts(self, es_results, top_n):
def filter_top_active_posts(self, es_results, top_n):
unique_results = []

thread_dict = {} # maps post titles to their respective activity levels
thread_dict = {} # maps post titles to their respective activity levels
# create dictionary with title as key and thread count as value
for result in es_results:
title = result['_source']['title']
Expand Down Expand Up @@ -383,10 +418,10 @@ def es_fetch_contributors_and_threads(self, es_index, title, domain):
"""
Fetches the count of threads and unique contributors for a given post based on title and domain
"""
# The search query
# The search query
domain_query = self.get_domain_query(domain)
query = {
"size": 0, # no search hits are returned, the focus is solely on the aggregations and counts
"size": 0, # no search hits are returned, the focus is solely on the aggregations and counts
"query": {
"bool": {
"must": [
Expand Down
Loading