Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store logs in csv file #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/push_topics_to_elasticsearch_cron_job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ jobs:

- name: Execute Python script
run: python push_topic_modeling_to_es.py

- name: Add and commit changes
run: |
git add .
if git diff --staged --quiet; then
echo "No changes to commit"
else
git pull
git commit -m "Updated logs"
git push
fi
11 changes: 11 additions & 0 deletions .github/workflows/update_vector_embeddings_cron_job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ jobs:

- name: Execute Python script
run: python update_vector_embedding_to_es.py

- name: Add and commit changes
run: |
git add .
if git diff --staged --quiet; then
echo "No changes to commit"
else
git pull
git commit -m "Updated logs"
git push
fi
423 changes: 233 additions & 190 deletions generate_topic_modeling_csv.py

Large diffs are not rendered by default.

217 changes: 128 additions & 89 deletions push_topic_modeling_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
import traceback
import ast

from src.utils import log_csv
from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient

Expand All @@ -17,10 +17,6 @@

if __name__ == "__main__":

# logs automatically rotate log file
os.makedirs("logs", exist_ok=True)
logger.add(f"logs/push_topic_modeling_to_es.log", rotation="23:59")

delay = 3
elastic_search = ElasticSearchClient()

Expand All @@ -29,89 +25,132 @@
"https://lists.linuxfoundation.org/pipermail/bitcoin-dev/",
"https://delvingbitcoin.org/",
"https://gnusha.org/pi/bitcoindev/",
"https://mailing-list.bitcoindevs.xyz/bitcoindev/",
]

for dev_url in dev_urls:
logger.info(f"dev_url: {dev_url}")
dev_name = dev_url.split("/")[-2]

# if APPLY_DATE_RANGE is set to False, elasticsearch will fetch all the docs in the index
APPLY_DATE_RANGE = False

OUTPUT_DIR = "gpt_output"
CSV_FILE_PATH = f"{OUTPUT_DIR}/topic_modeling_{dev_name}.csv"

if os.path.exists(CSV_FILE_PATH):
stored_df = pd.read_csv(CSV_FILE_PATH)
logger.info(f"Shape of stored df: {stored_df.shape}")
stored_df.set_index("source_id", inplace=True)
else:
logger.info(f"No data found in CSV! Path: {CSV_FILE_PATH}")
continue

if APPLY_DATE_RANGE:
current_date_str = None
if not current_date_str:
current_date_str = datetime.now().strftime("%Y-%m-%d")
start_date = datetime.now() - timedelta(days=7)
start_date_str = start_date.strftime("%Y-%m-%d")
logger.info(f"start_date: {start_date_str}")
logger.info(f"current_date_str: {current_date_str}")
else:
start_date_str = None
current_date_str = None

docs_list = elastic_search.fetch_data_for_empty_field(
es_index=ES_INDEX, url=dev_url, field_name=["primary_topics", "secondary_topics"],
start_date_str=start_date_str, current_date_str=current_date_str
)
logger.success(f"TOTAL THREADS RECEIVED WITH AN EMPTY ['primary_topics', 'secondary_topics']: {len(docs_list)}")

if docs_list:
for idx, doc in enumerate(tqdm.tqdm(docs_list)):
doc_source_id = doc['_source']['id']
doc_id = doc['_id']
doc_index = doc['_index']
logger.info(f"working on document with '_id': {doc_id} | 'title': {doc['_source']['title']}")

if not doc['_source'].get('primary_topics'):
try:
this_row = stored_df.loc[doc_source_id]

if not this_row.empty:
primary_kw = ast.literal_eval(this_row['primary_topics'])
secondary_kw = ast.literal_eval(this_row['secondary_topics'])

# update a primary topic
elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"primary_topics": primary_kw if primary_kw else []
}
}
)

# update a secondary topic
elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"secondary_topics": secondary_kw if secondary_kw else []
}
}
)
else:
logger.info(f"No data found for this doc in csv! Doc Id: {doc_id}, Source Id: {doc_source_id}")

except KeyError:
logger.error(f"Error Occurred: doc_source_id does not exist in stored_df! doc_source_id: {doc_source_id}")

except Exception as ex:
logger.error(f"Error updating ES index:{str(ex)}\n{traceback.format_exc()}")
try:
# logs automatically rotate log file
os.makedirs("logs", exist_ok=True)
logger.add(f"logs/push_topic_modeling_to_es.log", rotation="23:59")

url_status = {}

for dev_url in dev_urls:
updated_count = set()
error_message = "---"
error_occurred = False
try:
logger.info(f"dev_url: {dev_url}")
dev_name = dev_url.split("/")[-2]

# if APPLY_DATE_RANGE is set to False, elasticsearch will fetch all the docs in the index
APPLY_DATE_RANGE = False

OUTPUT_DIR = "gpt_output"
CSV_FILE_PATH = f"{OUTPUT_DIR}/topic_modeling_{dev_name}.csv"

if os.path.exists(CSV_FILE_PATH):
stored_df = pd.read_csv(CSV_FILE_PATH)
logger.info(f"Shape of stored df: {stored_df.shape}")
stored_df.set_index("source_id", inplace=True)
else:
logger.info(f"Exist: {doc['_source'].get('primary_topics')}")

logger.success(f"Process complete for dev_url: {dev_url}")
logger.info(f"No data found in CSV! Path: {CSV_FILE_PATH}")
continue

if APPLY_DATE_RANGE:
current_date_str = None
if not current_date_str:
current_date_str = datetime.now().strftime("%Y-%m-%d")
start_date = datetime.now() - timedelta(days=7)
start_date_str = start_date.strftime("%Y-%m-%d")
logger.info(f"start_date: {start_date_str}")
logger.info(f"current_date_str: {current_date_str}")
else:
start_date_str = None
current_date_str = None

docs_list = elastic_search.fetch_data_for_empty_field(
es_index=ES_INDEX, url=dev_url, field_name=["primary_topics", "secondary_topics"],
start_date_str=start_date_str, current_date_str=current_date_str
)
logger.success(f"TOTAL THREADS RECEIVED WITH AN EMPTY ['primary_topics', 'secondary_topics']: "
f"{len(docs_list)}")

if docs_list:
for idx, doc in enumerate(tqdm.tqdm(docs_list)):
doc_source_id = doc['_source']['id']
doc_id = doc['_id']
doc_index = doc['_index']
# logger.info(f"working on document with '_id': {doc_id} | 'title': {doc['_source']['title']}")

if not doc['_source'].get('primary_topics'):
try:
this_row = stored_df.loc[doc_source_id]

if not this_row.empty:
primary_kw = ast.literal_eval(this_row['primary_topics'])
secondary_kw = ast.literal_eval(this_row['secondary_topics'])

# update a primary topic
res_p = elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"primary_topics": primary_kw if primary_kw else []
}
}
)
updated_count.add(res_p["_id"])

# update a secondary topic
res_s = elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"secondary_topics": secondary_kw if secondary_kw else []
}
}
)
updated_count.add(res_s["_id"])

else:
logger.info(f"No data found for this doc in csv! Doc Id: {doc_id}, Source Id: "
f"{doc_source_id}")

except KeyError:
logger.error(f"Error Occurred: doc_source_id does not exist in stored_df! "
f"doc_source_id: {doc_source_id}")

except Exception as ex:
logger.error(f"Error updating ES index:{str(ex)}\n{traceback.format_exc()}")
else:
logger.info(f"Exist: {doc['_source'].get('primary_topics')}")

logger.success(f"Process complete for dev_url: {dev_url}")

except Exception as ex:
error_occurred = True
error_message = str(ex)

finally:
log_csv(file_name="push_topic_modeling_to_es",
url=dev_url,
updated=len(updated_count),
error_log=error_message,
error=str(error_occurred)
)

url_status[dev_url] = {
"updated_count": len(updated_count),
"error_occurred": error_occurred,
}

logger.success("Process complete for all domain ")
logger.info("Summary of updates for all URLs:")
for url, status in url_status.items():
logger.info(
f"URL: {url} - Updated Count: {status['updated_count']} - Error Occurred: {status['error_occurred']}.")

except Exception as ex:
logger.error(f"Error: {str(ex)}\n{traceback.format_exc()}")
17 changes: 17 additions & 0 deletions src/elasticsearch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@ def __init__(self,
http_auth=(self._es_username, self._es_password),
)

def get_domain_counts(self, index_name, domain):
"""Function to get the total counts for the given 'domain' field from Elasticsearch index."""
body = {
"query": {
"term": {
"domain.keyword": domain
}
}
}

try:
resp = self.es_client.count(index=index_name, body=body)
return resp['count']
except Exception as e:
logger.error(f"Error fetching domain counts: {e}")
return None

@property
def es_client(self):
return self._es_client
Expand Down
44 changes: 39 additions & 5 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import os
import re
import shutil
# import shutil
import traceback
import pandas as pd
from dotenv import load_dotenv
from dateutil.parser import parse
from loguru import logger
import warnings

from datetime import datetime
import csv
from src.elasticsearch_utils import ElasticSearchClient
warnings.filterwarnings("ignore")
load_dotenv()


month_dict = {
1: "Jan", 2: "Feb", 3: "March", 4: "April", 5: "May", 6: "June",
7: "July", 8: "Aug", 9: "Sept", 10: "Oct", 11: "Nov", 12: "Dec"
}
1: "Jan", 2: "Feb", 3: "March", 4: "April", 5: "May", 6: "June",
7: "July", 8: "Aug", 9: "Sept", 10: "Oct", 11: "Nov", 12: "Dec"
}

# pre-compile the patterns
regex_url = re.compile(r'http\S+|www\S+|https\S+')
Expand Down Expand Up @@ -243,3 +245,35 @@ def get_duplicated_docs_ids(df):
logger.info(f"Total: {len(total_ids)}, Keeping: {df_to_keep.shape[0]}, Dropping: {len(ids_to_drop)}")

return ids_to_drop


def log_csv(file_name, url=None, inserted=0, updated=0, no_changes=0, folder_path="daily_logs",
error="False", error_log="---"):
date = datetime.utcnow().strftime("%d_%m_%Y")
month_year = datetime.utcnow().strftime("%Y_%m")
time = datetime.utcnow().strftime("%H:%M:%S")

log_folder_path = os.path.join(folder_path, month_year)
if not os.path.exists(log_folder_path):
os.makedirs(log_folder_path)

csv_file_path = os.path.join(log_folder_path, f'{date}_logs.csv')
with open(csv_file_path, mode='a', newline='') as csv_file:
writer = csv.writer(csv_file)
if csv_file.tell() == 0:
writer.writerow(
['Date', 'Time', 'File name', 'URL', 'Inserted records', 'Updated records', 'No changes records',
'Total records', 'Error', 'Error log'])

total_docs = 0

if isinstance(url, str):
total_docs = ElasticSearchClient().get_domain_counts(index_name=os.getenv('INDEX'), domain=url)

elif isinstance(url, list):
for i in url:
t_docs = ElasticSearchClient().get_domain_counts(index_name=os.getenv('INDEX'), domain=i)
total_docs += t_docs

writer.writerow([date, time, file_name, url, inserted, updated, no_changes, total_docs, error, error_log])
logger.success("CSV Update Successfully :)")
Loading