diff --git a/compose.yml b/compose.yml index 218caa6..c6e4c7b 100644 --- a/compose.yml +++ b/compose.yml @@ -7,6 +7,7 @@ services: dockerfile: .devcontainer/Dockerfile image: cal-itp/customer-success:dev entrypoint: [] + env_file: .env command: sleep infinity ports: - "8000" diff --git a/notes/__init__.py b/notes/__init__.py index 434dff0..f2c806f 100644 --- a/notes/__init__.py +++ b/notes/__init__.py @@ -1,4 +1,7 @@ +import logging from pathlib import Path +import sys +logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s [%(levelname)s] (%(name)s): %(message)s") NOTES_PATH = Path("data/notes.json") diff --git a/notes/download.py b/notes/download.py index 9fa18af..c50612d 100644 --- a/notes/download.py +++ b/notes/download.py @@ -1,3 +1,4 @@ +import logging import os from pathlib import Path import sys @@ -9,6 +10,8 @@ from data.utils import chunk_list, hubspot_get_all_pages, hubspot_to_df, write_json_records, HubspotUserApi from notes import NOTES_PATH +logger = logging.getLogger("notes/download") + ACCESS_TOKEN = os.environ["HUBSPOT_ACCESS_TOKEN"] MAX_PAGES = int(os.environ.get("HUBSPOT_MAX_PAGES", sys.maxsize)) @@ -31,18 +34,23 @@ def get_last_note_id(): try: - return LAST_NOTE_PATH.read_text(encoding="utf-8").strip() + last_note_id = LAST_NOTE_PATH.read_text(encoding="utf-8").strip() + logger.info(f"Read last_note_id: {last_note_id}") + return last_note_id except FileNotFoundError: + logger.warning("File not found: last_note_id") return None def update_last_note_id(last_note_id): + logger.info(f"Updating last_note_id: {last_note_id}") LAST_NOTE_PATH.write_text(str(last_note_id).strip(), encoding="utf-8") -def get_notes() -> pd.DataFrame: +def get_notes(last_note_id: str) -> pd.DataFrame: + logger.info(f"Requesting notes after: {last_note_id}") + note_props = ["hs_created_by", "hs_createdate", "hs_note_body"] - last_note_id = get_last_note_id() notes_responses = hubspot_get_all_pages( hubspot_notes_api, @@ -52,13 +60,17 @@ def get_notes() -> pd.DataFrame: properties=note_props, associations=ASSOCIATION_TYPES, ) + notes_df = hubspot_to_df(notes_responses) + logger.info(f"Received {len(notes_df)} new notes") update_last_note_id(notes_responses[-1].results[-1].id) - return hubspot_to_df(notes_responses) + return notes_df + +def preprocess_notes(notes: pd.DataFrame, last_note_id: str) -> pd.DataFrame: + original_size = len(notes) -def preprocess_notes(notes: pd.DataFrame) -> pd.DataFrame: # rename vendor association column # weird name, maybe because it is a custom association type? if "associations.p5519226_vendors.results" in notes.columns: @@ -81,6 +93,9 @@ def preprocess_notes(notes: pd.DataFrame) -> pd.DataFrame: # and rename some for simplicity notes = notes.rename(columns=cols, errors="ignore") + # drop the last_note_id if it was included + notes = notes.drop(index=notes.loc[notes["id_note"].eq(last_note_id)].index) + # drop notes without a body notes = notes.dropna(subset=["body"]) @@ -129,6 +144,11 @@ def preprocess_notes(notes: pd.DataFrame) -> pd.DataFrame: notes = notes.drop(columns=ASSOCIATION_COLUMNS, errors="ignore") notes.reset_index(drop=True, inplace=True) + if original_size != len(notes): + logger.info(f"Filtered {original_size} notes to {len(notes)} with matching criteria") + else: + logger.info(f"All {len(notes)} notes have matching criteria") + return notes @@ -155,6 +175,8 @@ def join_companies(notes: pd.DataFrame) -> pd.DataFrame: ) companies.reset_index(drop=True, inplace=True) + logger.info(f"Joining {len(companies)} company records to notes") + # merge back with the notes DataFrame # left join since some notes are associated with vendors and not companies # so want to keep all the notes @@ -192,6 +214,8 @@ def join_users(notes: pd.DataFrame) -> pd.DataFrame: ) users.reset_index(drop=True, inplace=True) + logger.info("Joining user records to notes") + # merge back with the notes DataFrame notes = notes.merge(users, how="left", on="id_user") notes.reset_index(drop=True, inplace=True) @@ -220,6 +244,8 @@ def join_vendors(notes: pd.DataFrame) -> pd.DataFrame: ) vendors.reset_index(drop=True, inplace=True) + logger.info(f"Joining {len(vendors)} vendor records to notes") + # merge back with the notes DataFrame notes = notes.merge(vendors, how="left", on="id_vendor") notes.reset_index(drop=True, inplace=True) @@ -228,11 +254,16 @@ def join_vendors(notes: pd.DataFrame) -> pd.DataFrame: if __name__ == "__main__": - notes = get_notes() + logger.info("Starting to download Hubspot notes") - notes = preprocess_notes(notes) + last_note_id = get_last_note_id() + + notes = get_notes(last_note_id) + notes = preprocess_notes(notes, last_note_id) notes = join_companies(notes) notes = join_users(notes) notes = join_vendors(notes) + logger.info(f"Writing {len(notes)} processed notes for the next stage") + write_json_records(notes, NOTES_PATH.name) diff --git a/notes/post.py b/notes/post.py index 9e1302a..d0e5d98 100644 --- a/notes/post.py +++ b/notes/post.py @@ -1,5 +1,6 @@ from dataclasses import dataclass import json +import logging import os import time from typing import Generator @@ -11,6 +12,8 @@ from notes import NOTES_PATH +logger = logging.getLogger("notes/post") + ACCESS_TOKEN = os.environ["SLACK_ACCESS_TOKEN"] CHANNEL = os.environ["SLACK_CHANNEL_ID"] @@ -43,10 +46,13 @@ def read_notes_json() -> list[Note]: notes_data = json.loads(NOTES_PATH.read_text()) + logging.info(f"Read {len(notes_data)} notes from JSON") + return [Note(**note) for note in notes_data] def process_notes(notes: list[Note]) -> list[Note]: + truncated = 0 for note in notes: # collapse all text from the HTML body, joining distinct elements with a space # strip extra whitespace, and place inner newlines within a blockquote @@ -58,9 +64,14 @@ def process_notes(notes: list[Note]) -> list[Note]: # take the first 2995 characters, plus 3 for the ellipsis, plus 2 for the markdown blockquote and space # 2995 + 3 + 2 = 3000 note.body = note.body[0:2995] + "..." + truncated += 1 # convert from Unix timestamp milliseconds to Unix timestamp seconds for Slack's date formatting note.created_at = int(int(note.created_at) / 1000) + if truncated > 0: + logger.info(f"Truncated {truncated} long note bodies") + + logger.info(f"Processed {len(notes)} notes for message creation") return notes @@ -73,6 +84,8 @@ def create_messages(notes: list[Note]) -> Generator[dict, None, None]: else: target_name, target_id, target_type, type_id = (note.name_vendor, note.id_vendor, "Vendor", VENDORS) + logger.info(f"Creating [{target_type}] message") + note_id = note.id_note url = f"https://app.hubspot.com/contacts/{HUBSPOT_INSTANCE}/record/{type_id}/{target_id}/view/1?engagement={note_id}" @@ -96,6 +109,7 @@ def create_messages(notes: list[Note]) -> Generator[dict, None, None]: def post_messages(messages: Generator[dict, None, None]) -> Generator[SlackResponse, None, None]: for message in messages: + logger.info("Posting message to Slack") response = slack.chat_postMessage(**message) yield response time.sleep(RATE_LIMIT) @@ -112,3 +126,4 @@ def post_messages(messages: Generator[dict, None, None]) -> Generator[SlackRespo for response in responses: assert response.status_code == 200 + logger.info("Message posted successfully")