diff --git a/.img/cloud-news-summarizer.png b/.img/cloud-news-summarizer.png index fa7a7a6..8017982 100644 Binary files a/.img/cloud-news-summarizer.png and b/.img/cloud-news-summarizer.png differ diff --git a/.img/diagram.py b/.img/diagram.py index e561b66..f010241 100644 --- a/.img/diagram.py +++ b/.img/diagram.py @@ -7,25 +7,32 @@ with Diagram("cloud-news-summarizer", show=False, outformat="png"): with Cluster("{prefix}cloudnews\nStorage Account"): - queue = QueuesStorage("processqueue\nQueue") - poison_queue = QueuesStorage("processqueue-poison\nQueue") + timer_queue = QueuesStorage("timerqueue\nQueue") + timer_poison_queue = QueuesStorage("timerqueue-poison\nQueue") + process_queue = QueuesStorage("processqueue\nQueue") + process_poison_queue = QueuesStorage("processqueue-poison\nQueue") table = TableStorage("rsscheckpoint\nTable") timer = Alarm("30m schedule trigger") - rss = FunctionApps("AwsRssChecker\nFunction") - processor = FunctionApps("AwsPostHandler\nFunction") + timer_func = FunctionApps("TimerTrigger\nFunction") + rss = FunctionApps("RssPoller\nFunction") + processor = FunctionApps("PostHandler\nFunction") failure = FunctionApps("FailureHandler\nFunction") cog = CognitiveServices("{prefix}-cloud-news-cognitive\nCognitive Service") slack = Slack("Slack notification") - timer >> rss + timer >> timer_func + timer_func >> timer_queue + timer_queue >> rss rss >> table rss << table - rss >> queue - queue >> processor - processor >> poison_queue + rss >> process_queue + rss >> timer_poison_queue + process_queue >> processor + processor >> process_poison_queue processor >> cog processor << cog - poison_queue >> failure + process_poison_queue >> failure + timer_poison_queue >> failure [processor, failure] >> slack diff --git a/README.md b/README.md index 15da2f8..e2cf02d 100644 --- a/README.md +++ b/README.md @@ -14,21 +14,27 @@ It is expected that both the Function and the Language Cognitive Services usage Supporting resources are maintained in IaC with Pulumi. 1. Head to the [pulumi/](./pulumi) directory and follow the instructions to deploy that stack. 2. Deploy the Azure Functions in the [functions/](./functions) directory using either the Functions CLI, or the VS Code extension, choosing the `python3.9` runtime. Ensure that you set the required Applications Settings as detailed below in the deployed Function App resource. - - Note: If you deployed the Functions before setting the below config in the Function App, you may need to redeploy the functions for it to take effect + - Note: If you deployed the Functions before setting the below config in the Function App resource, you may need to redeploy the functions for it to take effect. The following Application Settings are required to be present on the deployed Function App: +- `AWS_ENABLED`: whether to poll for AWS news - set to `1` or `true` to enable +- `AZURE_ENABLED`: whether to poll for Azure news - set to `1` or `true` to enable - `TABLE_SA_CONNECTION`: connection string for storage account created in Pulumi - _available in your Storage Account resource in the Portal_ - `TABLE_NAME`: table name within storage account - _listed as a Pulumi output_ - `ENVIRONMENT`: table row key - _string to differentiate multiple deployments, can be anything alphanumeric, eg `prod`_ -- `QUEUE_NAME`: queue name within storage account - _listed as a Pulumi output_ +- `TIMER_QUEUE_NAME`: timer queue name within storage account - _listed as a Pulumi output_ +- `PROCESS_QUEUE_NAME`: processing queue name within storage account - _listed as a Pulumi output_ - `COGNITIVE_ENDPOINT`: endpoint URL (including `https://`) for the cognitive services resource - _listed as a Pulumi output_ -- `COGNITIVE_KEY`: key for the cognitive services resource - _available in your Language resource in the Portal_ -- `SLACK_WEBHOOK`: webhook URL for sending to Slack - _see the [Slack docs](https://api.slack.com/messaging/webhooks) if you aren't sure_ +- `COGNITIVE_KEY`: key for the Cognitive Services resource - _available in your Language resource in the Portal_ +- `AWS_SLACK_WEBHOOK`: webhook URL for sending AWS news to Slack (not required if `AWS_ENABLED` is unset) - _see the [Slack docs](https://api.slack.com/messaging/webhooks) if you aren't sure_ +- `AZURE_SLACK_WEBHOOK`: webhook URL for sending Azure news to Slack (not required if `AZURE_ENABLED` is unset) - _see the [Slack docs](https://api.slack.com/messaging/webhooks) if you aren't sure_ - `SLACK_FAILURE_WEBHOOK`: webhook URL for processing failure alerts to Slack - _can be the same or different to the normal Slack webhook (ie optionally send failures to a different channel)_ ## Current feeds supported Now: - AWS What's New (https://aws.amazon.com/about-aws/whats-new/recent/feed/) +- Azure Updates (https://azurecomcdn.azureedge.net/en-gb/updates/feed/) Next: -- Azure Updates (https://azurecomcdn.azureedge.net/en-gb/updates/feed/) +- GCP News (https://cloudblog.withgoogle.com/rss/) + diff --git a/functions/AwsPostHandler/README.md b/functions/AwsPostHandler/README.md deleted file mode 100644 index 569609e..0000000 --- a/functions/AwsPostHandler/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# AwsPostHandler -Uses BeautifulSoup to parse AWS news posts picked up from the `QUEUE_NAME` stroage queue, sends the main post content to Azure's text summarization service, and posts the result to Slack (with `SLACK_WEBHOOK`). \ No newline at end of file diff --git a/functions/AwsPostHandler/sample.dat b/functions/AwsPostHandler/sample.dat deleted file mode 100644 index f05bb51..0000000 --- a/functions/AwsPostHandler/sample.dat +++ /dev/null @@ -1 +0,0 @@ -sample queue data \ No newline at end of file diff --git a/functions/AwsRssChecker/README.md b/functions/AwsRssChecker/README.md deleted file mode 100644 index fdd2a41..0000000 --- a/functions/AwsRssChecker/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# AwsRssChecker -Polls the _AWS What's New_ RSS feed on a regular interval (30-minute timer by default) to check for new articles. If any found, add them to the `QUEUE_NAME` queue for processing by the `AwsPostHandler` function. - -Stores the last successful check timestamp in an Azure Storage Table `TABLE_NAME` to avoid duplication. \ No newline at end of file diff --git a/functions/AwsRssChecker/__init__.py b/functions/AwsRssChecker/__init__.py deleted file mode 100644 index 9974cd3..0000000 --- a/functions/AwsRssChecker/__init__.py +++ /dev/null @@ -1,100 +0,0 @@ -import logging -import os -import time -from typing import Union - -import azure.data.tables as tables -import azure.functions as func -import azure.storage.queue as queue -import feedparser - -FEED_URL = "https://aws.amazon.com/about-aws/whats-new/recent/feed/" - -# Gather required environment variables -CONNECTION_STRING = os.environ["TABLE_SA_CONNECTION"] -TABLE_NAME = os.environ["TABLE_NAME"] -QUEUE_NAME = os.environ["QUEUE_NAME"] -ENVIRONMENT = os.environ["ENVIRONMENT"] - -FEED_CHECKPOINT = "aws-latest-feed" -ARTICLE_CHECKPOINT = "aws-latest-article" - - -def get_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> Union[float, None]: - try: - table_service = tables.TableServiceClient.from_connection_string(connection_string) - table_client = table_service.get_table_client(table_name) - checkpoint = table_client.get_entity(partition_key=partition_key, row_key=row_key) - return float(checkpoint["ts"]) - except Exception as e: - logging.warning("Exception getting checkpoint: {}".format(e)) - return None - - -def set_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str, ts: float) -> None: - try: - table_service = tables.TableServiceClient.from_connection_string(connection_string) - table_client = table_service.get_table_client(table_name) - checkpoint_out = { - "PartitionKey": partition_key, - "RowKey": row_key, - "ts": str(ts) - } - table_client.upsert_entity(checkpoint_out) - except Exception as e: - logging.warning("Exception setting checkpoint: {}".format(e)) - - -def get_rss(url: str, last_run: time.struct_time) -> Union[feedparser.FeedParserDict, None]: - feed = feedparser.parse(url) - try: - if feed.feed.published_parsed > last_run: - return feed - else: - logging.info("Feed not updated since last check") - return None - except Exception as e: - logging.warning("Exception checking feed publish timestamp: {}".format(e)) - return feed - - -def process_entry(entry: feedparser.util.FeedParserDict, last_run: time.struct_time, - queue_client: queue.QueueClient) -> None: - if entry.published_parsed > last_run: - logging.info("New entry: {} {}".format(entry.title, entry.link)) - queue_client.send_message(bytes(entry.link, "utf-8")) - logging.info("Added {} to queue".format(entry.link)) - - -def main(timer: func.TimerRequest) -> None: - feed_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, FEED_CHECKPOINT, ENVIRONMENT) - article_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, ARTICLE_CHECKPOINT, ENVIRONMENT) - if feed_checkpoint is not None: - logging.info("Using {} as FEED checkpoint".format(feed_checkpoint)) - feed = get_rss(FEED_URL, time.gmtime(feed_checkpoint)) - else: - logging.info("No FEED checkpoint - using current time minus 30m") - feed = get_rss(FEED_URL, time.gmtime(time.time() - (30 * 60))) - - if feed is not None: - if article_checkpoint is not None: - logging.info("Using {} as ARTICLE checkpoint".format(feed_checkpoint)) - else: - logging.info("No ARTICLE checkpoint - using current time minus 30m") - - queue_client = queue.QueueClient.from_connection_string(CONNECTION_STRING, QUEUE_NAME, - message_encode_policy=queue.BinaryBase64EncodePolicy(), - message_decode_policy=queue.BinaryBase64DecodePolicy()) - latest_article = time.localtime(0) - for entry in feed.entries: - if article_checkpoint is not None: - process_entry(entry, time.gmtime(article_checkpoint), queue_client) - else: - process_entry(entry, time.gmtime(time.time() - (30 * 60)), queue_client) - if entry.published_parsed > latest_article: - latest_article = entry.published_parsed - - set_checkpoint(CONNECTION_STRING, TABLE_NAME, ARTICLE_CHECKPOINT, ENVIRONMENT, - time.mktime(feed.feed.published_parsed)) - set_checkpoint(CONNECTION_STRING, TABLE_NAME, FEED_CHECKPOINT, ENVIRONMENT, - time.mktime(feed.feed.published_parsed)) diff --git a/functions/AwsRssChecker/sample.dat b/functions/AwsRssChecker/sample.dat deleted file mode 100644 index e69de29..0000000 diff --git a/functions/FailureHandler/README.md b/functions/FailureHandler/README.md deleted file mode 100644 index c4ae18d..0000000 --- a/functions/FailureHandler/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# FailureHandler -Function to monitor the `-poison` Azure-generated dead-letter queue for processing failures, and send them to Slack (with `SLACK_FAILURE_WEBHOOK`) when found. diff --git a/functions/FailureHandler/sample.dat b/functions/FailureHandler/sample.dat deleted file mode 100644 index f05bb51..0000000 --- a/functions/FailureHandler/sample.dat +++ /dev/null @@ -1 +0,0 @@ -sample queue data \ No newline at end of file diff --git a/functions/PostHandler/README.md b/functions/PostHandler/README.md new file mode 100644 index 0000000..ac904ee --- /dev/null +++ b/functions/PostHandler/README.md @@ -0,0 +1,8 @@ +# AwsPostHandler +Uses BeautifulSoup to parse news posts picked up from the `PROCESS_QUEUE_NAME` storage queue, sends the main post content to Azure's text summarization service, and posts the result to Slack (with the `{CLOUD}_SLACK_WEBHOOK` environment variable). + +Expects an input from the queue message in the format `cloud§articleurl`. + +Currently supports: +- AWS articles: https://aws.amazon.com/new/ +- Azure articles: https://azure.microsoft.com/en-gb/updates/ \ No newline at end of file diff --git a/functions/AwsPostHandler/__init__.py b/functions/PostHandler/__init__.py similarity index 55% rename from functions/AwsPostHandler/__init__.py rename to functions/PostHandler/__init__.py index e90397c..0025917 100644 --- a/functions/AwsPostHandler/__init__.py +++ b/functions/PostHandler/__init__.py @@ -11,7 +11,35 @@ # Gather required environment variables COGNITIVE_ENDPOINT = os.environ["COGNITIVE_ENDPOINT"] COGNITIVE_KEY = os.environ["COGNITIVE_KEY"] -SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK"] + +# Optional environment variables +AWS_SLACK_WEBHOOK = os.getenv("AWS_SLACK_WEBHOOK", "") +AZURE_SLACK_WEBHOOK = os.getenv("AZURE_SLACK_WEBHOOK", "") + +DELIMITER = "§" + + +def aws_article_text(soup: BeautifulSoup) -> dict: + article_title = soup.title.text + article_paragraphs = soup.find_all("div", class_="aws-text-box") + + article_text = "" + for p in article_paragraphs: + article_text += (p.text.replace("\n", "") + "\n") + + return {"title": article_title, "text": article_text} + + +def azure_article_text(soup: BeautifulSoup) -> dict: + article_title = soup.title.text.split("|") + article_div = soup.find("div", class_="row-divided") + article_p = article_div.find_all("p") + + article_text = "" + for p in article_p: + article_text += (p.text.replace("\n", "") + "\n") + + return {"title": article_title[0], "text": article_text} def create_client() -> textanalytics.TextAnalyticsClient: @@ -20,34 +48,43 @@ def create_client() -> textanalytics.TextAnalyticsClient: return client +# Expects message in format `cloudname§feedurl` def main(msg: func.QueueMessage) -> None: message = msg.get_body().decode("utf-8") + parsed_message = message.split(DELIMITER) logging.info("Processing {}".format(message)) + cloud = parsed_message[0] + article_url = parsed_message[1] - entry = requests.get(message) + entry = requests.get(article_url) if entry.status_code != 200: logging.error(entry.text) - raise Exception("Non-200 response {} from target: {}".format(entry.status_code, message)) + raise Exception("Non-200 response {} from target: {}".format(entry.status_code, article_url)) soup = BeautifulSoup(entry.text, "html.parser") - article_paragraphs = soup.find_all("div", class_="aws-text-box") - article_title = soup.title.text - - article_text = "" - for paragraph in article_paragraphs: - article_text += (paragraph.text.replace("\n", "") + "\n") + if cloud == "aws": + if AWS_SLACK_WEBHOOK == "": + raise Exception("{} article, but AWS_SLACK_WEBHOOK unset". format(cloud)) + article = aws_article_text(soup) + elif cloud == "azure": + if AZURE_SLACK_WEBHOOK == "": + raise Exception("{} article, but AZURE_SLACK_WEBHOOK unset". format(cloud)) + article = azure_article_text(soup) + else: + logging.error("Unexpected cloud: {}".format(cloud)) + raise NotImplementedError - if article_text != "": + if article["text"] != "": summarise_client = create_client() poller = summarise_client.begin_analyze_actions( - documents=[article_text], + documents=[article["text"]], actions=[textanalytics.ExtractSummaryAction(max_sentance_count=3)]) summarise_results = poller.result() for result in summarise_results: if result[0].is_error: - logging.error("Summarisation error: code {}, message {}".format(result[0].code, result[0].message)) - raise Exception("Summarisation failure") + logging.error("Summarization error: code {}, message {}".format(result[0].code, result[0].message)) + raise Exception("Summarization failure") else: logging.info("Summary:\n{}".format(" ".join([sentence.text for sentence in result[0].sentences]))) slack_blocks = { @@ -56,7 +93,7 @@ def main(msg: func.QueueMessage) -> None: "type": "section", "text": { "type": "mrkdwn", - "text": "*<{}|{}>*".format(message, article_title) + "text": "*<{}|{}>*".format(article_url, article["title"]) } }, { @@ -68,7 +105,11 @@ def main(msg: func.QueueMessage) -> None: } ] } - slack_response = requests.post(SLACK_WEBHOOK, json.dumps(slack_blocks)) + if cloud == "aws": + slack_response = requests.post(AWS_SLACK_WEBHOOK, json.dumps(slack_blocks)) + elif cloud == "azure": + slack_response = requests.post(AZURE_SLACK_WEBHOOK, json.dumps(slack_blocks)) + if slack_response.status_code != 200: logging.warning("Non-200 from Slack: {} {}".format(slack_response.status_code, slack_response.text)) raise Exception("Failed to send to Slack") diff --git a/functions/AwsPostHandler/function.json b/functions/PostHandler/function.json similarity index 100% rename from functions/AwsPostHandler/function.json rename to functions/PostHandler/function.json diff --git a/functions/ProcessFailureHandler/README.md b/functions/ProcessFailureHandler/README.md new file mode 100644 index 0000000..32f1956 --- /dev/null +++ b/functions/ProcessFailureHandler/README.md @@ -0,0 +1,2 @@ +# ProcessFailureHandler +Function to monitor the `-poison` Azure-generated dead-letter queue for processing failures (from the [PostHandler](../PostHandler) Function), and send them to Slack (with `SLACK_FAILURE_WEBHOOK`) when found. diff --git a/functions/FailureHandler/__init__.py b/functions/ProcessFailureHandler/__init__.py similarity index 100% rename from functions/FailureHandler/__init__.py rename to functions/ProcessFailureHandler/__init__.py diff --git a/functions/FailureHandler/function.json b/functions/ProcessFailureHandler/function.json similarity index 100% rename from functions/FailureHandler/function.json rename to functions/ProcessFailureHandler/function.json diff --git a/functions/RssPoller/README.md b/functions/RssPoller/README.md new file mode 100644 index 0000000..7f43124 --- /dev/null +++ b/functions/RssPoller/README.md @@ -0,0 +1,6 @@ +# RssPoller +Polls the RSS feed given by the `INPUT_QUEUE_NAME` to check for new articles. If any new articles are found, add them to the `PROCESS_QUEUE_NAME` queue for processing by the `PostHandler` function. + +Stores the latest feed and article timestamp for each feed type in an Azure Storage Table `TABLE_NAME` to avoid duplication. + +Expects an input from the queue message in the format `cloud§rssfeedurl` - eg `aws§https://aws.amazon.com/about-aws/whats-new/recent/feed/` \ No newline at end of file diff --git a/functions/RssPoller/__init__.py b/functions/RssPoller/__init__.py new file mode 100644 index 0000000..6884330 --- /dev/null +++ b/functions/RssPoller/__init__.py @@ -0,0 +1,127 @@ +import logging +import os +import time +from typing import Union + +import azure.data.tables as tables +import azure.functions as func +import azure.storage.queue as queue +import feedparser + +# Gather required environment variables +CONNECTION_STRING = os.environ["TABLE_SA_CONNECTION"] +TABLE_NAME = os.environ["TABLE_NAME"] +QUEUE_NAME = os.environ["PROCESS_QUEUE_NAME"] +ENVIRONMENT = os.environ["ENVIRONMENT"] + +FEED_CHECKPOINT_KEY = "latest-feed" +ARTICLE_CHECKPOINT_KEY = "latest-article" +DELIMITER = "§" + + +def get_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> Union[float, None]: + try: + table_service = tables.TableServiceClient.from_connection_string(connection_string) + table_client = table_service.get_table_client(table_name) + checkpoint = table_client.get_entity(partition_key=partition_key, row_key=row_key) + return float(checkpoint["ts"]) + except Exception as e: + logging.warning("Exception getting checkpoint: {}".format(e)) + return None + + +def set_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str, ts: float) -> None: + try: + table_service = tables.TableServiceClient.from_connection_string(connection_string) + table_client = table_service.get_table_client(table_name) + checkpoint_out = { + "PartitionKey": partition_key, + "RowKey": row_key, + "ts": str(ts) + } + table_client.upsert_entity(checkpoint_out) + except Exception as e: + logging.warning("Exception setting checkpoint: {}".format(e)) + + +def get_rss(url: str, cloud: str, last_run: time.struct_time) -> Union[feedparser.FeedParserDict, None]: + feed = feedparser.parse(url) + if cloud == "aws": + try: + if feed.feed.published_parsed > last_run: + return feed + else: + logging.info("Feed not updated since last check") + return None + except Exception as e: + logging.warning("Exception checking {} feed publish timestamp: {}".format(cloud, e)) + return feed + elif cloud == "azure": + try: + if feed.feed.updated_parsed > last_run: + return feed + else: + logging.info("Feed not updated since last check") + return None + except Exception as e: + logging.warning("Exception checking {} feed publish timestamp: {}".format(cloud, e)) + return feed + else: + raise NotImplementedError("unexpected cloud: {}".format(cloud)) + + +def process_entry(entry: feedparser.util.FeedParserDict, cloud: str, last_run: time.struct_time, + queue_client: queue.QueueClient) -> None: + if entry.published_parsed > last_run: + logging.info("New entry: {} {} {}".format(entry.title, entry.link, time.mktime(entry.published_parsed))) + queue_client.send_message(bytes("{}{}{}".format(cloud, DELIMITER, entry.link), "utf-8")) + logging.info("Added `{}{}{}` to queue".format(cloud, DELIMITER, entry.link)) + + +# Expects message in format `cloudname§feedurl` +def main(msg: func.QueueMessage) -> None: + message = msg.get_body().decode("utf-8") + parsed_message = message.split(DELIMITER) + logging.info("Processing {}".format(message)) + cloud = parsed_message[0] + feed_url = parsed_message[1] + + feed_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, "{}-{}".format(cloud, FEED_CHECKPOINT_KEY), + ENVIRONMENT) + article_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, "{}-{}".format(cloud, ARTICLE_CHECKPOINT_KEY), + ENVIRONMENT) + + if feed_checkpoint is not None: + logging.info("Using {} as FEED checkpoint".format(feed_checkpoint)) + feed = get_rss(feed_url, cloud, time.gmtime(feed_checkpoint)) + else: + logging.info("No FEED checkpoint - using current time minus 30m") + feed = get_rss(feed_url, cloud, time.gmtime(time.time() - (30 * 60))) + + if feed is not None: + if article_checkpoint is not None: + logging.info("Using {} as ARTICLE checkpoint".format(article_checkpoint)) + else: + logging.info("No ARTICLE checkpoint - using current time minus 120m") + + queue_client = queue.QueueClient.from_connection_string(CONNECTION_STRING, QUEUE_NAME, + message_encode_policy=queue.BinaryBase64EncodePolicy(), + message_decode_policy=queue.BinaryBase64DecodePolicy()) + latest_article = time.localtime(0) + for entry in feed.entries: + if article_checkpoint is not None: + process_entry(entry, cloud, time.gmtime(article_checkpoint), queue_client) + else: + process_entry(entry, cloud, time.gmtime(time.time() - (120 * 60)), queue_client) + if entry.published_parsed > latest_article: + latest_article = entry.published_parsed + + set_checkpoint(CONNECTION_STRING, TABLE_NAME, "{}-{}".format(cloud, ARTICLE_CHECKPOINT_KEY), ENVIRONMENT, + time.mktime(latest_article)) + if cloud == "aws": + set_checkpoint(CONNECTION_STRING, TABLE_NAME, "{}-{}".format(cloud, FEED_CHECKPOINT_KEY), ENVIRONMENT, + time.mktime(feed.feed.published_parsed)) + elif cloud == "azure": + set_checkpoint(CONNECTION_STRING, TABLE_NAME, "{}-{}".format(cloud, FEED_CHECKPOINT_KEY), ENVIRONMENT, + time.mktime(feed.feed.updated_parsed)) + # Anything else would have thrown an exception in get_rss() diff --git a/functions/RssPoller/function.json b/functions/RssPoller/function.json new file mode 100644 index 0000000..7838a15 --- /dev/null +++ b/functions/RssPoller/function.json @@ -0,0 +1,12 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "name": "msg", + "type": "queueTrigger", + "direction": "in", + "queueName": "timerqueue", + "connection": "TABLE_SA_CONNECTION" + } + ] +} diff --git a/functions/TimerFailureHandler/README.md b/functions/TimerFailureHandler/README.md new file mode 100644 index 0000000..2574e2e --- /dev/null +++ b/functions/TimerFailureHandler/README.md @@ -0,0 +1,4 @@ +# TimerFailureHandler +Seemingly Azure doesn't allow a Function to have multiple triggers. Therefore, this Function is an exact dupe (symlinked) of [ProcessFailureHandler/](../ProcessFailureHandler), but subscribed to the `timerqueue-poison` queue instead. + +Function to monitor the `-poison` Azure-generated dead-letter queue for timer processing failures (from the [RssPoller](../RssPoller) Function), and send them to Slack (with `SLACK_FAILURE_WEBHOOK`) when found. diff --git a/functions/TimerFailureHandler/__init__.py b/functions/TimerFailureHandler/__init__.py new file mode 120000 index 0000000..5bb2c0b --- /dev/null +++ b/functions/TimerFailureHandler/__init__.py @@ -0,0 +1 @@ +B:/chubb/Documents/GitHub/cloud-news-summariser/functions/ProcessFailureHandler/__init__.py \ No newline at end of file diff --git a/functions/TimerFailureHandler/function.json b/functions/TimerFailureHandler/function.json new file mode 100644 index 0000000..f047479 --- /dev/null +++ b/functions/TimerFailureHandler/function.json @@ -0,0 +1,12 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "name": "msg", + "type": "queueTrigger", + "direction": "in", + "queueName": "timerqueue-poison", + "connection": "TABLE_SA_CONNECTION" + } + ] +} diff --git a/functions/TimerTrigger/README.md b/functions/TimerTrigger/README.md new file mode 100644 index 0000000..b911727 --- /dev/null +++ b/functions/TimerTrigger/README.md @@ -0,0 +1,2 @@ +# TimerTrigger +Triggers on a timer, checks which cloud feeds are enabled based on the `AWS_ENABLED` and `AZURE_ENABLED` environment variables, and adds the appropriate messages to the `timerqueue` queue for processing by the [RssPoller](../RssPoller) Function. \ No newline at end of file diff --git a/functions/TimerTrigger/__init__.py b/functions/TimerTrigger/__init__.py new file mode 100644 index 0000000..b3e6863 --- /dev/null +++ b/functions/TimerTrigger/__init__.py @@ -0,0 +1,31 @@ +import logging +import os + +import azure.functions as func +import azure.storage.queue as queue + +# Optional environment variables +AWS_ENABLED = os.getenv("AWS_ENABLED", "0") +AZURE_ENABLED = os.getenv("AZURE_ENABLED", "0") + +# Required environment variables +CONNECTION_STRING = os.environ["TABLE_SA_CONNECTION"] +QUEUE_NAME = os.environ["TIMER_QUEUE_NAME"] + +AWS_URL = "https://aws.amazon.com/about-aws/whats-new/recent/feed/" +AZURE_URL = "https://azurecomcdn.azureedge.net/en-gb/updates/feed/" +DELIMITER = "§" + + +def main(timer: func.TimerRequest) -> None: + logging.info("AWS_ENABLED: {} AZURE_ENABLED: {}".format(AWS_ENABLED, AZURE_ENABLED)) + queue_client = queue.QueueClient.from_connection_string(CONNECTION_STRING, QUEUE_NAME, + message_encode_policy=queue.BinaryBase64EncodePolicy(), + message_decode_policy=queue.BinaryBase64DecodePolicy()) + + if AWS_ENABLED == "1" or AWS_ENABLED.lower() == "true": + queue_client.send_message(bytes("aws{}{}".format(DELIMITER, AWS_URL), "utf-8")) + logging.info("Added `aws{}{}` to queue".format(DELIMITER, AWS_URL)) + if AZURE_ENABLED == "1" or AZURE_ENABLED.lower() == "true": + queue_client.send_message(bytes("azure{}{}".format(DELIMITER, AZURE_URL), "utf-8")) + logging.info("Added `azure{}{}` to queue".format(DELIMITER, AZURE_URL)) diff --git a/functions/AwsRssChecker/function.json b/functions/TimerTrigger/function.json similarity index 100% rename from functions/AwsRssChecker/function.json rename to functions/TimerTrigger/function.json diff --git a/pulumi/__main__.py b/pulumi/__main__.py index 4c06f63..61a074e 100644 --- a/pulumi/__main__.py +++ b/pulumi/__main__.py @@ -24,6 +24,11 @@ resource_group_name=resource_group.name, table_name="rsscheckpoint") +timer_queue = azure.storage.Queue("timer-queue", + account_name=account.name, + resource_group_name=resource_group.name, + queue_name="timerqueue") + process_queue = azure.storage.Queue("process-queue", account_name=account.name, resource_group_name=resource_group.name, @@ -38,7 +43,8 @@ pulumi.export("rg-name", resource_group.name) pulumi.export("sa-name", account.name) pulumi.export("table-name", checkpoint_table.name) -pulumi.export("queue-name", process_queue.name) +pulumi.export("timer-queue-name", timer_queue.name) +pulumi.export("process-queue-name", process_queue.name) pulumi.export("cognitive-name", cognitive_account.name) cognitive_endpoint = pulumi.Output.all(resource_group.name, cognitive_account.name)\ .apply(lambda args: azure.cognitiveservices.get_account(resource_group_name=args[0], account_name=args[1]))\