Skip to content

Commit

Permalink
Fix: avoid re-processing duplicate notes (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
thekaveman authored Jul 15, 2024
2 parents fde6174 + 48da874 commit b4b8387
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 7 deletions.
1 change: 1 addition & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
dockerfile: .devcontainer/Dockerfile
image: cal-itp/customer-success:dev
entrypoint: []
env_file: .env
command: sleep infinity
ports:
- "8000"
Expand Down
3 changes: 3 additions & 0 deletions notes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
from pathlib import Path
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s [%(levelname)s] (%(name)s): %(message)s")

NOTES_PATH = Path("data/notes.json")
45 changes: 38 additions & 7 deletions notes/download.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from pathlib import Path
import sys
Expand All @@ -9,6 +10,8 @@
from data.utils import chunk_list, hubspot_get_all_pages, hubspot_to_df, write_json_records, HubspotUserApi
from notes import NOTES_PATH

logger = logging.getLogger("notes/download")


ACCESS_TOKEN = os.environ["HUBSPOT_ACCESS_TOKEN"]
MAX_PAGES = int(os.environ.get("HUBSPOT_MAX_PAGES", sys.maxsize))
Expand All @@ -31,18 +34,23 @@

def get_last_note_id():
try:
return LAST_NOTE_PATH.read_text(encoding="utf-8").strip()
last_note_id = LAST_NOTE_PATH.read_text(encoding="utf-8").strip()
logger.info(f"Read last_note_id: {last_note_id}")
return last_note_id
except FileNotFoundError:
logger.warning("File not found: last_note_id")
return None


def update_last_note_id(last_note_id):
logger.info(f"Updating last_note_id: {last_note_id}")
LAST_NOTE_PATH.write_text(str(last_note_id).strip(), encoding="utf-8")


def get_notes() -> pd.DataFrame:
def get_notes(last_note_id: str) -> pd.DataFrame:
logger.info(f"Requesting notes after: {last_note_id}")

note_props = ["hs_created_by", "hs_createdate", "hs_note_body"]
last_note_id = get_last_note_id()

notes_responses = hubspot_get_all_pages(
hubspot_notes_api,
Expand All @@ -52,13 +60,17 @@ def get_notes() -> pd.DataFrame:
properties=note_props,
associations=ASSOCIATION_TYPES,
)
notes_df = hubspot_to_df(notes_responses)
logger.info(f"Received {len(notes_df)} new notes")

update_last_note_id(notes_responses[-1].results[-1].id)

return hubspot_to_df(notes_responses)
return notes_df


def preprocess_notes(notes: pd.DataFrame, last_note_id: str) -> pd.DataFrame:
original_size = len(notes)

def preprocess_notes(notes: pd.DataFrame) -> pd.DataFrame:
# rename vendor association column
# weird name, maybe because it is a custom association type?
if "associations.p5519226_vendors.results" in notes.columns:
Expand All @@ -81,6 +93,9 @@ def preprocess_notes(notes: pd.DataFrame) -> pd.DataFrame:
# and rename some for simplicity
notes = notes.rename(columns=cols, errors="ignore")

# drop the last_note_id if it was included
notes = notes.drop(index=notes.loc[notes["id_note"].eq(last_note_id)].index)

# drop notes without a body
notes = notes.dropna(subset=["body"])

Expand Down Expand Up @@ -129,6 +144,11 @@ def preprocess_notes(notes: pd.DataFrame) -> pd.DataFrame:
notes = notes.drop(columns=ASSOCIATION_COLUMNS, errors="ignore")
notes.reset_index(drop=True, inplace=True)

if original_size != len(notes):
logger.info(f"Filtered {original_size} notes to {len(notes)} with matching criteria")
else:
logger.info(f"All {len(notes)} notes have matching criteria")

return notes


Expand All @@ -155,6 +175,8 @@ def join_companies(notes: pd.DataFrame) -> pd.DataFrame:
)
companies.reset_index(drop=True, inplace=True)

logger.info(f"Joining {len(companies)} company records to notes")

# merge back with the notes DataFrame
# left join since some notes are associated with vendors and not companies
# so want to keep all the notes
Expand Down Expand Up @@ -192,6 +214,8 @@ def join_users(notes: pd.DataFrame) -> pd.DataFrame:
)
users.reset_index(drop=True, inplace=True)

logger.info("Joining user records to notes")

# merge back with the notes DataFrame
notes = notes.merge(users, how="left", on="id_user")
notes.reset_index(drop=True, inplace=True)
Expand Down Expand Up @@ -220,6 +244,8 @@ def join_vendors(notes: pd.DataFrame) -> pd.DataFrame:
)
vendors.reset_index(drop=True, inplace=True)

logger.info(f"Joining {len(vendors)} vendor records to notes")

# merge back with the notes DataFrame
notes = notes.merge(vendors, how="left", on="id_vendor")
notes.reset_index(drop=True, inplace=True)
Expand All @@ -228,11 +254,16 @@ def join_vendors(notes: pd.DataFrame) -> pd.DataFrame:


if __name__ == "__main__":
notes = get_notes()
logger.info("Starting to download Hubspot notes")

notes = preprocess_notes(notes)
last_note_id = get_last_note_id()

notes = get_notes(last_note_id)
notes = preprocess_notes(notes, last_note_id)
notes = join_companies(notes)
notes = join_users(notes)
notes = join_vendors(notes)

logger.info(f"Writing {len(notes)} processed notes for the next stage")

write_json_records(notes, NOTES_PATH.name)
15 changes: 15 additions & 0 deletions notes/post.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass
import json
import logging
import os
import time
from typing import Generator
Expand All @@ -11,6 +12,8 @@

from notes import NOTES_PATH

logger = logging.getLogger("notes/post")


ACCESS_TOKEN = os.environ["SLACK_ACCESS_TOKEN"]
CHANNEL = os.environ["SLACK_CHANNEL_ID"]
Expand Down Expand Up @@ -43,10 +46,13 @@ def read_notes_json() -> list[Note]:

notes_data = json.loads(NOTES_PATH.read_text())

logging.info(f"Read {len(notes_data)} notes from JSON")

return [Note(**note) for note in notes_data]


def process_notes(notes: list[Note]) -> list[Note]:
truncated = 0
for note in notes:
# collapse all text from the HTML body, joining distinct elements with a space
# strip extra whitespace, and place inner newlines within a blockquote
Expand All @@ -58,9 +64,14 @@ def process_notes(notes: list[Note]) -> list[Note]:
# take the first 2995 characters, plus 3 for the ellipsis, plus 2 for the markdown blockquote and space
# 2995 + 3 + 2 = 3000
note.body = note.body[0:2995] + "..."
truncated += 1
# convert from Unix timestamp milliseconds to Unix timestamp seconds for Slack's date formatting
note.created_at = int(int(note.created_at) / 1000)

if truncated > 0:
logger.info(f"Truncated {truncated} long note bodies")

logger.info(f"Processed {len(notes)} notes for message creation")
return notes


Expand All @@ -73,6 +84,8 @@ def create_messages(notes: list[Note]) -> Generator[dict, None, None]:
else:
target_name, target_id, target_type, type_id = (note.name_vendor, note.id_vendor, "Vendor", VENDORS)

logger.info(f"Creating [{target_type}] message")

note_id = note.id_note
url = f"https://app.hubspot.com/contacts/{HUBSPOT_INSTANCE}/record/{type_id}/{target_id}/view/1?engagement={note_id}"

Expand All @@ -96,6 +109,7 @@ def create_messages(notes: list[Note]) -> Generator[dict, None, None]:

def post_messages(messages: Generator[dict, None, None]) -> Generator[SlackResponse, None, None]:
for message in messages:
logger.info("Posting message to Slack")
response = slack.chat_postMessage(**message)
yield response
time.sleep(RATE_LIMIT)
Expand All @@ -112,3 +126,4 @@ def post_messages(messages: Generator[dict, None, None]) -> Generator[SlackRespo

for response in responses:
assert response.status_code == 200
logger.info("Message posted successfully")

0 comments on commit b4b8387

Please sign in to comment.