Skip to content

Commit

Permalink
Update Umbrella.py
Browse files Browse the repository at this point in the history
This script has initialy been written for the Umbrella API v1.
This API has been retired since March 22.
We recently had the need to use it, so I changed the code to work with the v2 API.
  • Loading branch information
Noatun authored Feb 27, 2025
1 parent 8ca08b4 commit 1479425
Showing 1 changed file with 59 additions and 24 deletions.
83 changes: 59 additions & 24 deletions analyzers/Umbrella/Umbrella.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# encoding: utf-8
import json
import requests
from base64 import b64encode
from cortexutils.analyzer import Analyzer

class UmbrellaAnalyzer(Analyzer):
Expand All @@ -12,45 +13,79 @@ def __init__(self):
self.api_secret = self.get_param('config.api_secret', None, 'api_secret is missing')
self.organization_id = self.get_param('config.organization_id', None, 'organization_id is missing')
self.query_limit = str(self.get_param('config.query_limit', 20))
self.token = None

def umbrella_runreport(self, destination):
base_url = "https://reports.api.umbrella.com/v1/organizations"
url = "{}/{}/destinations/{}/activity?limit={}".format(base_url,self.organization_id,destination,self.query_limit)
try:
r = requests.get(url, auth=(self.api_key, self.api_secret))
if r.status_code == 200:
return json.loads(r.text)
else:
self.error('API query failed. Check parameters.')
except Exception as e:
self.unexpectedError(e)
token = self.get_bearer_token()
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}

report_url = f"https://reports.api.umbrella.com/v2/organizations/{self.organization_id}/activity?from=-7days&to=now&domains={destination}&limit={self.query_limit}"

response = requests.get(report_url, headers=headers)
print(response)
if response.status_code == 200:
return json.loads(response.text)
else:
print(f"Failed to get categories: {response.text}")
return None

def get_bearer_token(self):
auth_url = "https://api.umbrella.com/auth/v2/token"
credentials = f"{self.api_key}:{self.api_secret}"
encoded_credentials = b64encode(credentials.encode()).decode()

headers = {
'Authorization': f'Basic {encoded_credentials}',
'Content-Type': 'application/json'
}

response = requests.post(auth_url, headers=headers)
if response.status_code == 200:
token_data = response.json()
self.token = token_data['access_token']
#print(self.token)
return self.token
else:
print(f"Failed to get bearer token: {response.text}")
return None

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "Umbrella"
predicate = "GetReport"
value = "0"

if len(raw['requests']) > 0:
taxonomies.append(self.build_taxonomy(
'info',
'Umbrella',
'Hits',
'True'))
else:
taxonomies.append(self.build_taxonomy(
'info',
'Umbrella',
'Hits',
'False'))
if "data" in raw and len(raw["data"]) > 0:
item = raw["data"][0]
if "verdict" in item:
verdicts = item['verdict']
value = "{}".format(verdicts)

if verdicts.lower() in ["allowed", "passed", "none"]:
level = "safe"
elif verdicts.lower() in ["blocked", "rejected", "failed"]:
level = "malicious"
else:
level = "suspicious"

taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))

return {'taxonomies': taxonomies}



def run(self):
# Map The Hive observable types to Umbrella observable types
observable_mapping = {
"domain": "domain",
"fqdn": "domain",
"fqdn": "domain",
}



if self.service == 'get':
dataType = self.get_param("dataType")

Expand Down

0 comments on commit 1479425

Please sign in to comment.