Skip to content

Commit

Permalink
Multi-feed/cloud rework (#8)
Browse files Browse the repository at this point in the history
* Adds Azure feed support

Including full refactor for multiple feed support with Function reuse, and decoupling the `RssPoller` Function from the Timer trigger to allow multiple input events

* Adds article timestamp logging

* Improves function READMEs

* Optimises `PostHandler` imports
  • Loading branch information
Phuurl authored Apr 21, 2022
1 parent 9a6c161 commit eec2e1c
Show file tree
Hide file tree
Showing 26 changed files with 295 additions and 140 deletions.
Binary file modified .img/cloud-news-summarizer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 16 additions & 9 deletions .img/diagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,32 @@

with Diagram("cloud-news-summarizer", show=False, outformat="png"):
with Cluster("{prefix}cloudnews\nStorage Account"):
queue = QueuesStorage("processqueue\nQueue")
poison_queue = QueuesStorage("processqueue-poison\nQueue")
timer_queue = QueuesStorage("timerqueue\nQueue")
timer_poison_queue = QueuesStorage("timerqueue-poison\nQueue")
process_queue = QueuesStorage("processqueue\nQueue")
process_poison_queue = QueuesStorage("processqueue-poison\nQueue")
table = TableStorage("rsscheckpoint\nTable")

timer = Alarm("30m schedule trigger")
rss = FunctionApps("AwsRssChecker\nFunction")
processor = FunctionApps("AwsPostHandler\nFunction")
timer_func = FunctionApps("TimerTrigger\nFunction")
rss = FunctionApps("RssPoller\nFunction")
processor = FunctionApps("PostHandler\nFunction")
failure = FunctionApps("FailureHandler\nFunction")

cog = CognitiveServices("{prefix}-cloud-news-cognitive\nCognitive Service")
slack = Slack("Slack notification")

timer >> rss
timer >> timer_func
timer_func >> timer_queue
timer_queue >> rss
rss >> table
rss << table
rss >> queue
queue >> processor
processor >> poison_queue
rss >> process_queue
rss >> timer_poison_queue
process_queue >> processor
processor >> process_poison_queue
processor >> cog
processor << cog
poison_queue >> failure
process_poison_queue >> failure
timer_poison_queue >> failure
[processor, failure] >> slack
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@ It is expected that both the Function and the Language Cognitive Services usage
Supporting resources are maintained in IaC with Pulumi.
1. Head to the [pulumi/](./pulumi) directory and follow the instructions to deploy that stack.
2. Deploy the Azure Functions in the [functions/](./functions) directory using either the Functions CLI, or the VS Code extension, choosing the `python3.9` runtime. Ensure that you set the required Applications Settings as detailed below in the deployed Function App resource.
- Note: If you deployed the Functions before setting the below config in the Function App, you may need to redeploy the functions for it to take effect
- Note: If you deployed the Functions before setting the below config in the Function App resource, you may need to redeploy the functions for it to take effect.

The following Application Settings are required to be present on the deployed Function App:
- `AWS_ENABLED`: whether to poll for AWS news - set to `1` or `true` to enable
- `AZURE_ENABLED`: whether to poll for Azure news - set to `1` or `true` to enable
- `TABLE_SA_CONNECTION`: connection string for storage account created in Pulumi - _available in your Storage Account resource in the Portal_
- `TABLE_NAME`: table name within storage account - _listed as a Pulumi output_
- `ENVIRONMENT`: table row key - _string to differentiate multiple deployments, can be anything alphanumeric, eg `prod`_
- `QUEUE_NAME`: queue name within storage account - _listed as a Pulumi output_
- `TIMER_QUEUE_NAME`: timer queue name within storage account - _listed as a Pulumi output_
- `PROCESS_QUEUE_NAME`: processing queue name within storage account - _listed as a Pulumi output_
- `COGNITIVE_ENDPOINT`: endpoint URL (including `https://`) for the cognitive services resource - _listed as a Pulumi output_
- `COGNITIVE_KEY`: key for the cognitive services resource - _available in your Language resource in the Portal_
- `SLACK_WEBHOOK`: webhook URL for sending to Slack - _see the [Slack docs](https://api.slack.com/messaging/webhooks) if you aren't sure_
- `COGNITIVE_KEY`: key for the Cognitive Services resource - _available in your Language resource in the Portal_
- `AWS_SLACK_WEBHOOK`: webhook URL for sending AWS news to Slack (not required if `AWS_ENABLED` is unset) - _see the [Slack docs](https://api.slack.com/messaging/webhooks) if you aren't sure_
- `AZURE_SLACK_WEBHOOK`: webhook URL for sending Azure news to Slack (not required if `AZURE_ENABLED` is unset) - _see the [Slack docs](https://api.slack.com/messaging/webhooks) if you aren't sure_
- `SLACK_FAILURE_WEBHOOK`: webhook URL for processing failure alerts to Slack - _can be the same or different to the normal Slack webhook (ie optionally send failures to a different channel)_

## Current feeds supported
Now:
- AWS What's New (https://aws.amazon.com/about-aws/whats-new/recent/feed/)
- Azure Updates (https://azurecomcdn.azureedge.net/en-gb/updates/feed/)

Next:
- Azure Updates (https://azurecomcdn.azureedge.net/en-gb/updates/feed/)
- GCP News (https://cloudblog.withgoogle.com/rss/)

2 changes: 0 additions & 2 deletions functions/AwsPostHandler/README.md

This file was deleted.

1 change: 0 additions & 1 deletion functions/AwsPostHandler/sample.dat

This file was deleted.

4 changes: 0 additions & 4 deletions functions/AwsRssChecker/README.md

This file was deleted.

100 changes: 0 additions & 100 deletions functions/AwsRssChecker/__init__.py

This file was deleted.

Empty file removed functions/AwsRssChecker/sample.dat
Empty file.
2 changes: 0 additions & 2 deletions functions/FailureHandler/README.md

This file was deleted.

1 change: 0 additions & 1 deletion functions/FailureHandler/sample.dat

This file was deleted.

8 changes: 8 additions & 0 deletions functions/PostHandler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# AwsPostHandler
Uses BeautifulSoup to parse news posts picked up from the `PROCESS_QUEUE_NAME` storage queue, sends the main post content to Azure's text summarization service, and posts the result to Slack (with the `{CLOUD}_SLACK_WEBHOOK` environment variable).

Expects an input from the queue message in the format `cloud§articleurl`.

Currently supports:
- AWS articles: https://aws.amazon.com/new/
- Azure articles: https://azure.microsoft.com/en-gb/updates/
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,35 @@
# Gather required environment variables
COGNITIVE_ENDPOINT = os.environ["COGNITIVE_ENDPOINT"]
COGNITIVE_KEY = os.environ["COGNITIVE_KEY"]
SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK"]

# Optional environment variables
AWS_SLACK_WEBHOOK = os.getenv("AWS_SLACK_WEBHOOK", "")
AZURE_SLACK_WEBHOOK = os.getenv("AZURE_SLACK_WEBHOOK", "")

DELIMITER = "§"


def aws_article_text(soup: BeautifulSoup) -> dict:
article_title = soup.title.text
article_paragraphs = soup.find_all("div", class_="aws-text-box")

article_text = ""
for p in article_paragraphs:
article_text += (p.text.replace("\n", "") + "\n")

return {"title": article_title, "text": article_text}


def azure_article_text(soup: BeautifulSoup) -> dict:
article_title = soup.title.text.split("|")
article_div = soup.find("div", class_="row-divided")
article_p = article_div.find_all("p")

article_text = ""
for p in article_p:
article_text += (p.text.replace("\n", "") + "\n")

return {"title": article_title[0], "text": article_text}


def create_client() -> textanalytics.TextAnalyticsClient:
Expand All @@ -20,34 +48,43 @@ def create_client() -> textanalytics.TextAnalyticsClient:
return client


# Expects message in format `cloudname§feedurl`
def main(msg: func.QueueMessage) -> None:
message = msg.get_body().decode("utf-8")
parsed_message = message.split(DELIMITER)
logging.info("Processing {}".format(message))
cloud = parsed_message[0]
article_url = parsed_message[1]

entry = requests.get(message)
entry = requests.get(article_url)
if entry.status_code != 200:
logging.error(entry.text)
raise Exception("Non-200 response {} from target: {}".format(entry.status_code, message))
raise Exception("Non-200 response {} from target: {}".format(entry.status_code, article_url))

soup = BeautifulSoup(entry.text, "html.parser")
article_paragraphs = soup.find_all("div", class_="aws-text-box")
article_title = soup.title.text

article_text = ""
for paragraph in article_paragraphs:
article_text += (paragraph.text.replace("\n", "") + "\n")
if cloud == "aws":
if AWS_SLACK_WEBHOOK == "":
raise Exception("{} article, but AWS_SLACK_WEBHOOK unset". format(cloud))
article = aws_article_text(soup)
elif cloud == "azure":
if AZURE_SLACK_WEBHOOK == "":
raise Exception("{} article, but AZURE_SLACK_WEBHOOK unset". format(cloud))
article = azure_article_text(soup)
else:
logging.error("Unexpected cloud: {}".format(cloud))
raise NotImplementedError

if article_text != "":
if article["text"] != "":
summarise_client = create_client()
poller = summarise_client.begin_analyze_actions(
documents=[article_text],
documents=[article["text"]],
actions=[textanalytics.ExtractSummaryAction(max_sentance_count=3)])

summarise_results = poller.result()
for result in summarise_results:
if result[0].is_error:
logging.error("Summarisation error: code {}, message {}".format(result[0].code, result[0].message))
raise Exception("Summarisation failure")
logging.error("Summarization error: code {}, message {}".format(result[0].code, result[0].message))
raise Exception("Summarization failure")
else:
logging.info("Summary:\n{}".format(" ".join([sentence.text for sentence in result[0].sentences])))
slack_blocks = {
Expand All @@ -56,7 +93,7 @@ def main(msg: func.QueueMessage) -> None:
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*<{}|{}>*".format(message, article_title)
"text": "*<{}|{}>*".format(article_url, article["title"])
}
},
{
Expand All @@ -68,7 +105,11 @@ def main(msg: func.QueueMessage) -> None:
}
]
}
slack_response = requests.post(SLACK_WEBHOOK, json.dumps(slack_blocks))
if cloud == "aws":
slack_response = requests.post(AWS_SLACK_WEBHOOK, json.dumps(slack_blocks))
elif cloud == "azure":
slack_response = requests.post(AZURE_SLACK_WEBHOOK, json.dumps(slack_blocks))

if slack_response.status_code != 200:
logging.warning("Non-200 from Slack: {} {}".format(slack_response.status_code, slack_response.text))
raise Exception("Failed to send to Slack")
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions functions/ProcessFailureHandler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ProcessFailureHandler
Function to monitor the `-poison` Azure-generated dead-letter queue for processing failures (from the [PostHandler](../PostHandler) Function), and send them to Slack (with `SLACK_FAILURE_WEBHOOK`) when found.
File renamed without changes.
File renamed without changes.
6 changes: 6 additions & 0 deletions functions/RssPoller/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# RssPoller
Polls the RSS feed given by the `INPUT_QUEUE_NAME` to check for new articles. If any new articles are found, add them to the `PROCESS_QUEUE_NAME` queue for processing by the `PostHandler` function.

Stores the latest feed and article timestamp for each feed type in an Azure Storage Table `TABLE_NAME` to avoid duplication.

Expects an input from the queue message in the format `cloud§rssfeedurl` - eg `aws§https://aws.amazon.com/about-aws/whats-new/recent/feed/`
Loading

0 comments on commit eec2e1c

Please sign in to comment.