Skip to content

Commit

Permalink
fix(proofpoint_trap): propose alternative to range adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
frikilax committed Feb 25, 2025
1 parent 86be2f7 commit 807a514
Showing 1 changed file with 55 additions and 46 deletions.
101 changes: 55 additions & 46 deletions vulture_os/toolkit/api_parser/proofpoint_trap/proofpoint_trap.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@

import json
import logging
import time

import requests

from datetime import datetime, timedelta
from datetime import timedelta
from django.utils import timezone
from django.conf import settings
from toolkit.api_parser.api_parser import ApiParser
Expand All @@ -41,6 +40,9 @@
class ProofpointTRAPAPIError(Exception):
pass

class ProofpointTRAPAPIRateLimitError(Exception):
pass


class ProofpointTRAPParser(ApiParser):
HEADERS = {
Expand All @@ -58,11 +60,6 @@ def __init__(self, data):
self.proofpoint_trap_apikey = data["proofpoint_trap_apikey"]

self.session = None
self.range_second_list = [
1, 15, 30, # seconds
1*60, 5*60, 10*60, 15*60, 30*60, # minutes
1*60*60, 2*60*60, 4*60*60, 12*60*60, 24*60*60 # hours
]

def _connect(self):
try:
Expand Down Expand Up @@ -94,7 +91,10 @@ def __execute_query(self, url, query=None, timeout=20):
verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl
)

if response.status_code != 200:
# handler rate limit exceeding
if response.status_code == 429:
raise ProofpointTRAPAPIRateLimitError
elif response.status_code != 200:
raise ProofpointTRAPAPIError(
f"Error at Proofpoint TRAP API Call URL: {url} Code: {response.status_code} Content: {response.content}")

Expand All @@ -106,7 +106,7 @@ def test(self):
try:
since = current_time - timedelta(hours=4)
to = current_time
logs = self.get_logs(since=since, to=to)
logs, _ = self.get_logs(since=since, to=to)

return {
"status": True,
Expand All @@ -122,16 +122,31 @@ def test(self):
def get_logs(self, since, to):
alert_url = self.proofpoint_trap_host + "/api/incidents"

# Format timestamp for query
since_formatted = since.isoformat()[:19] + 'Z'
to_formatted = to.isoformat()[:19] + 'Z'
while not self.evt_stop.is_set():

query = {
'created_after': since_formatted, # >= created_at
'created_before': to_formatted, # < created_at
}
# Format timestamp for query
since_formatted = since.isoformat()[:19] + 'Z'
to_formatted = to.isoformat()[:19] + 'Z'

return self.__execute_query(alert_url, query)
query = {
'created_after': since_formatted, # >= created_at
'created_before': to_formatted, # < created_at
}

try:
return self.__execute_query(alert_url, query), to
except requests.exceptions.ReadTimeout:
to = to - (to-since)/2
assert (to-since) > timedelta(seconds=1), "Reduced range is too small, cannot continue"
logger.warning(f"[{__parser__}]:get_logs: Read Timeout: decreasing time range: {since} -> {to}",
extra={'frontend': str(self.frontend)})
continue
except ProofpointTRAPAPIRateLimitError:
logger.info(f"[{__parser__}]:execute: API Rate limit exceeded, waiting 10 seconds...",
extra={'frontend': str(self.frontend)})
self.evt_stop.wait(10)

return None, None

def format_incidents_logs(self, incident_log):

Expand Down Expand Up @@ -187,44 +202,38 @@ def format_alerts_logs(self, alert_log):
return json.dumps(alert_log)

def execute(self):
range_index = len(self.range_second_list) - 1

since = self.frontend.last_api_call or (timezone.now() - timedelta(days=30))
max_to = min(timezone.now(), since + timedelta(hours=24))
to = min(max_to, since + timedelta(seconds=self.range_second_list[range_index]))

while not self.evt_stop.is_set():
msg = f"Parser starting from {since} to {to}"
logger.info(f"[{__parser__}]:execute: {msg}", extra={'frontend': str(self.frontend)})
# fetch at most 24h of logs to avoid the process running for too long
to = min(timezone.now(), since + timedelta(hours=24))

# Downloading may take some while, so refresh token in Redis
self.update_lock()
try:
response = self.get_logs(since, to)
# delay the times of 15 minutes, to get the "updated" event
to = to - timedelta(minutes=15)

if range_index < len(self.range_second_list) - 1:
range_index += 1
except requests.exceptions.ReadTimeout:
if range_index > 0:
range_index -= 1
to = min(max_to, since + timedelta(seconds=self.range_second_list[range_index]))
logger.warning(f"[{__parser__}]:get_logs: Timeout: down range {timedelta(seconds=self.range_second_list[range_index])}",
extra={'frontend': str(self.frontend)})
continue
msg = f"Parser starting from {since} to {to}"
logger.info(f"[{__parser__}]:execute: {msg}", extra={'frontend': str(self.frontend)})

response, to = self.get_logs(since, to)

# Downloading may take some while, so refresh token in Redis
self.update_lock()

if response:
logger.info(f"[{__parser__}]: GET LOG OK", extra={'frontend': str(self.frontend)})

alerts_logs = []
for incident_log in response:
formated_incident_log = self.format_incidents_logs(incident_log)
alerts_logs.extend([self.format_alerts_logs(log) for log in formated_incident_log['events']])
self.write_to_file(alerts_logs)
alert_logs = formated_incident_log['events']

# update last_api_call only if logs are retrieved
since = to
to = min(max_to, since + timedelta(seconds=self.range_second_list[range_index]))
self.frontend.last_api_call = since
self.write_to_file([self.format_alerts_logs(log) for log in alert_logs])

logger.info(f"[{__parser__}]:execute: Parsing done.", extra={'frontend': str(self.frontend)})
# Writting may take some while, so refresh token in Redis
self.update_lock()

# update last_api_call only if logs are retrieved
self.frontend.last_api_call = to + timedelta(seconds=1)
elif since < timezone.now() - timedelta(hours=24):
self.frontend.last_api_call = since + timedelta(hours=1)

if to > max_to or since >= max_to:
return
logger.info(f"[{__parser__}]:execute: Parsing done.", extra={'frontend': str(self.frontend)})

0 comments on commit 807a514

Please sign in to comment.