Skip to content

Commit

Permalink
'Refactored by Sourcery'
Browse files Browse the repository at this point in the history
  • Loading branch information
SourceryAI committed Jul 30, 2023
1 parent d74920b commit 6324daa
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 82 deletions.
4 changes: 1 addition & 3 deletions core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def assemble_cors_origins(

@validator("API_V1_STR", pre=True)
def assemble_api_v1_str(cls, v: str) -> str:
if v.startswith("/"):
return v
return f"/{v}"
return v if v.startswith("/") else f"/{v}"

class Config:
case_sensitive = True
Expand Down
20 changes: 8 additions & 12 deletions lib/auth/auth_bearer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,19 @@ def __init__(self, auto_error: bool = True):

async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
payload, isTokenValid = self.verify_jwt(credentials.credentials)
if not isTokenValid:
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return payload, credentials.credentials
else:
if not credentials:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
if credentials.scheme != "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
payload, isTokenValid = self.verify_jwt(credentials.credentials)
if not isTokenValid:
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return payload, credentials.credentials

def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False

try:
payload = decodeJWT(jwtoken)
except:
payload = None
if payload:
isTokenValid = True
isTokenValid = bool(payload)
return payload, isTokenValid
35 changes: 16 additions & 19 deletions lib/recon/cors_misconfig/cors_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,15 @@ def __init__(self, target):
self.netloc = self.parsed.netloc
self.scheme = self.parsed.scheme

self.url = self.scheme + "://" + self.netloc + (self.parsed.path
or '/')
self.url = f"{self.scheme}://{self.netloc}" + (self.parsed.path or '/')

self.delay = 0.5
self.results = {}

def scan(self):
threadpool = ThreadPoolExecutor(max_workers=2)
future = threadpool.submit(self.cors)
result = future.result()
if result:
if result := future.result():
self.results.update(result)
return self.results[self.url]
else:
Expand All @@ -65,15 +63,15 @@ def cors(self):
print("Connection error: ", exc)

def active_tests(self, url, root, scheme, header_dict, delay):
origin = scheme + '://' + root
origin = f'{scheme}://{root}'
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
None), headers.get('access-control-allow-credentials', None)
if acao_header is None:
return

origin = scheme + '://' + 'example.com'
origin = f'{scheme}://example.com'
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
Expand All @@ -85,7 +83,7 @@ def active_tests(self, url, root, scheme, header_dict, delay):
return {url: info}
time.sleep(delay)

origin = scheme + '://' + root + '.example.com'
origin = f'{scheme}://{root}.example.com'
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
Expand All @@ -97,7 +95,7 @@ def active_tests(self, url, root, scheme, header_dict, delay):
return {url: info}
time.sleep(delay)

origin = scheme + '://d3v' + root
origin = f'{scheme}://d3v{root}'
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
Expand All @@ -121,7 +119,7 @@ def active_tests(self, url, root, scheme, header_dict, delay):
return {url: info}
time.sleep(delay)

origin = scheme + '://' + root + '_.example.com'
origin = f'{scheme}://{root}_.example.com'
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
Expand All @@ -133,7 +131,7 @@ def active_tests(self, url, root, scheme, header_dict, delay):
return {url: info}
time.sleep(delay)

origin = scheme + '://' + root + '%60.example.com'
origin = f'{scheme}://{root}%60.example.com'
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
Expand All @@ -146,7 +144,7 @@ def active_tests(self, url, root, scheme, header_dict, delay):
time.sleep(delay)

if root.count('.') > 1:
origin = scheme + '://' + root.replace('.', 'x', 1)
origin = f'{scheme}://' + root.replace('.', 'x', 1)
headers = self._requester(url, scheme, header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
Expand All @@ -157,18 +155,17 @@ def active_tests(self, url, root, scheme, header_dict, delay):
info['acac header'] = acac_header
return {url: info}
time.sleep(delay)
origin = 'http://' + root
origin = f'http://{root}'
headers = self._requester(url, 'http', header_dict, origin)
acao_header, acac_header = headers.get(
'access-control-allow-origin',
None), headers.get('access-control-allow-credentials', None)
if acao_header and acao_header.startswith('http://'):
info = self.details['http origin allowed']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url: info}
else:
if not acao_header or not acao_header.startswith('http://'):
return self.passive_tests(url, headers)
info = self.details['http origin allowed']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url: info}

def passive_tests(self, url, headers):
root = host(url)
Expand All @@ -192,7 +189,7 @@ def _requester(self, url, scheme, headers, origin):
try:
response = requests.get(url, headers=headers, verify=False)
headers = response.headers
for key, value in headers.items():
for key in headers:
if key.lower() == 'access-control-allow-origin':
return headers
except requests.exceptions.RequestException as e:
Expand Down
9 changes: 3 additions & 6 deletions lib/recon/dnsscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, host):
self.spf = []
self.dmarc = []
self.dnssec = False
self.ddos = dict()
self.ddos = {}
self.issues = []

def scan(self):
Expand Down Expand Up @@ -88,8 +88,7 @@ def _get_txt(self):

def _get_srv(self):
try:
srv_answers = dns.resolver.resolve('_sip._tcp.' + self.domain,
'SRV')
srv_answers = dns.resolver.resolve(f'_sip._tcp.{self.domain}', 'SRV')
for srv_rdata in srv_answers:
self.srv.append(str(srv_rdata.target).rstrip('.'))
except:
Expand All @@ -108,9 +107,7 @@ def _check_dnssec(self):
def _is_same_subnet(self, ip1, ip2):
ip1 = ip1.split('.')
ip2 = ip2.split('.')
if ip1[0] == ip2[0] and ip1[1] == ip2[1] and ip1[2] == ip2[2]:
return True
return False
return ip1[0] == ip2[0] and ip1[1] == ip2[1] and ip1[2] == ip2[2]

def _check_dns_security(self):

Expand Down
80 changes: 38 additions & 42 deletions lib/recon/security_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def scan(self):
Returns:
dict: Analysis of security headers
"""
analysis = {}
analysis['X-XSS-Protection'] = self._check_xss_protection()
analysis = {'X-XSS-Protection': self._check_xss_protection()}
analysis[
'Content-Security-Policy'] = self._check_content_security_policy()
analysis[
Expand All @@ -35,8 +34,7 @@ def _check_xss_protection(self):
Returns:
dict: Analysis of X-XSS-Protection header
"""
xss_protection = self.headers.get('X-XSS-Protection')
if xss_protection:
if xss_protection := self.headers.get('X-XSS-Protection'):
return {'status': "enabled", 'policy': xss_protection}
else:
return {
Expand All @@ -54,8 +52,7 @@ def _check_content_security_policy(self):
Returns:
dict: Analysis of Content-Security-Policy header
"""
content_security_policy = self.headers.get('Content-Security-Policy')
if content_security_policy:
if content_security_policy := self.headers.get('Content-Security-Policy'):
return {
'status': "enabled",
'policy': content_security_policy,
Expand All @@ -76,32 +73,33 @@ def _check_strict_transport_security(self):
Returns:
dict: Analysis of Strict-Transport-Security header
"""
strict_transport_security = self.headers.get(
'Strict-Transport-Security')
if strict_transport_security:
analysis = {
'status': "enabled",
'policy': strict_transport_security,
}
max_age = strict_transport_security.split('max-age=')[1].split(
';')[0]
if int(max_age) < 31536000:
analysis = {
'status': "enabled",
'policy': strict_transport_security,
'solution':
'Increase max-age to at least 31536000 seconds (1 year) to ensure long-term protection against protocol downgrade attacks',
'severity': 'medium'
}
return analysis
else:
if not (
strict_transport_security := self.headers.get(
'Strict-Transport-Security'
)
):
return {
'status': "not set",
'policy': 'not set',
'solution':
'Set Strict-Transport-Security header to enable HTTPS-only mode and protect against protocol downgrade attacks',
'severity': 'high'
}
analysis = {
'status': "enabled",
'policy': strict_transport_security,
}
max_age = strict_transport_security.split('max-age=')[1].split(
';')[0]
if int(max_age) < 31536000:
analysis = {
'status': "enabled",
'policy': strict_transport_security,
'solution':
'Increase max-age to at least 31536000 seconds (1 year) to ensure long-term protection against protocol downgrade attacks',
'severity': 'medium'
}
return analysis

@exception()
def _check_x_frame_options(self):
Expand All @@ -110,26 +108,24 @@ def _check_x_frame_options(self):
Returns:
dict: Analysis of X-Frame-Options header
"""
x_frame_options = self.headers.get('X-Frame-Options')
if x_frame_options:
analysis = x_frame_options
if x_frame_options != 'DENY' and x_frame_options != 'SAMEORIGIN':
analysis = {
'status': x_frame_options,
'solution':
'Set X-Frame-Options header to DENY or SAMEORIGIN to prevent clickjacking attacks',
'severity': 'medium'
}
else:
analysis = {
'status': 'enabled',
'policy': x_frame_options,
}
return analysis
else:
if not (x_frame_options := self.headers.get('X-Frame-Options')):
return {
'status': 'not set',
'solution':
'Set X-Frame-Options header to DENY or SAMEORIGIN to prevent clickjacking attacks',
'severity': 'high'
}
analysis = x_frame_options
analysis = (
{
'status': x_frame_options,
'solution': 'Set X-Frame-Options header to DENY or SAMEORIGIN to prevent clickjacking attacks',
'severity': 'medium',
}
if x_frame_options not in ['DENY', 'SAMEORIGIN']
else {
'status': 'enabled',
'policy': x_frame_options,
}
)
return analysis

0 comments on commit 6324daa

Please sign in to comment.