-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathphish_detector.py
364 lines (342 loc) · 21.4 KB
/
phish_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import random
import sys
import json
import time
import requests
import whois
from bs4 import BeautifulSoup as bsoup
from PIL import Image
from rich.table import Table
from rich import print as printc
class PhishDetector:
def __init__(self,url: str):
if url.startswith('http') and not self.get_domain_name(url).replace(".","").isdigit():
self.url = url
self.defanged_url = self.get_defanged_url(self.url)
self.expanded_url = ""
self.servers = ""
self.target_webpage_screenshot = ""
else:
sys.exit(printc(f"[red3][-][/red3] {url}: Invalid url specified (e.g.: https://example.com)!"))
def get_whois_info(self, target_ip_address: str, verbosity: bool) -> None:
try:
target_whois_info = whois.whois(target_ip_address)
if verbosity:
for key,value in target_whois_info.items():
if key != "status":
if isinstance(value, list):
if 'date' in key:
printc(f"[spring_green2][+][/spring_green2] {key.capitalize()}: {value[0]}")
else:
printc(f"[spring_green2][+][/spring_green2] {key.capitalize()}:", *value)
else:
if value is None:
printc(f"[red3][-][/red3] {key.capitalize()}: N/A")
else:
printc(f"[spring_green2][+][/spring_green2] {key.capitalize()}: {value}")
else:
whois_keys = ['name', 'emails', 'address', 'registrant_postal_code', 'registrar', 'creation_date', 'updated_date', 'expiration_date', 'country']
for key,value in target_whois_info.items():
if key in whois_keys:
if isinstance(value, list):
if 'date' in key:
printc(f"[spring_green2][+][/spring_green2] {key.capitalize()}: {value[0]}")
else:
printc(f"[spring_green2][+][/spring_green2] {key.capitalize()}:", *value)
else:
if value is None:
printc(f"[red3][-][/red3] {key.capitalize()}: N/A")
else:
printc(f"[spring_green2][+][/spring_green2] {key.capitalize()}: {value}")
except Exception:
printc("[red3][-][/red3] Ooops, something went wrong :(")
printc("[red3][-] Unable to retrieve whois information!![/red3]")
@staticmethod
def get_user_agent() -> str:
# Generate a random user-agent
with open('db/user_agents.db') as f:
user_agents = f.readlines()
return random.choice(user_agents)[:-1]
def get_url_redirections(self, verbosity: bool) -> None:
# Set the HTTP Request header
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'User-Agent': self.get_user_agent(),
'Referer': 'https://iplogger.org/',
'DNT': '1',
'Upgrade-Insecure-Requests': '1',
}
# Check the target url's redirection(s)
ip_logger_url_checker = "https://iplogger.org/url-checker/"
with requests.Session() as session:
response = session.get(ip_logger_url_checker, headers=headers)
# Mimic an authentic request (Avoid detection)
if 'Set-Cookie' in response.headers:
headers['Cookie'] = response.headers['Set-Cookie']
if 'Cache-Control' in response.headers:
headers['Cache-Control'] = response.headers['Cache-Control']
if 'Last-Modified' in response.headers:
headers['If-Modified-Since'] = response.headers['Last-Modified']
params = {"url": self.url}
response = session.get(ip_logger_url_checker, headers=headers, params=params)
self.servers = list() # List of dictionaries
if response.ok:
soup = bsoup(response.content, 'html.parser')
servers_info = soup.find_all("div", class_="server-info")
for server_info in servers_info:
server_items = server_info.find_all("div", class_="server-item")
server_antivirus = server_info.find("div", class_="server-antivirus")
server_next = server_info.find("div", class_="server-next")
server_item_info = list()
server_dict = dict() # Dictionary containing information about each server from which the request goes through
for server_item in server_items:
for item in server_item:
if item != "\n":
server_item_info.append(item)
if server_item_info[0].string == "Host":
server_dict[server_item_info[0].string] = server_item_info[-1].string
self.expanded_url = server_item_info[-1].string
elif server_item_info[0].string == "IP address":
server_dict[server_item_info[0].string] = server_item_info[-1].contents[-2].string
self.target_ip_address = server_item_info[-1].contents[-2].string
else:
server_dict[server_item_info[0].string] = server_item_info[-1].string
server_item_info.clear()
server_dict["Status code"] = server_next.contents[1].string
server_dict["Google Safe Browsing Database"] = server_antivirus.contents[1].string
self.servers.append(server_dict)
# Display url's information based on the verbosity
number_of_redirections = len(self.servers)
if verbosity and number_of_redirections > 1:
table = Table(title="ℝ 𝔼 𝔻 𝕀 ℝ 𝔼 ℂ 𝕋 𝕀 𝕆 ℕ 𝕊",show_lines=True)
table.add_column("ID", justify="center")
table.add_column("URL", justify="center", max_width=60)
table.add_column("Status Code", justify="center")
table.add_column("IP Address", justify="center")
table.add_column("Country by IP", justify="center")
for server_index in range(number_of_redirections):
table.add_row(str(server_index+1), self.servers[server_index]['Host'], self.servers[server_index]['Status code'], self.servers[server_index]['IP address'], self.servers[server_index]['Country by IP'])
printc(table)
elif number_of_redirections > 1:
table = Table(title="ℝ 𝔼 𝔻 𝕀 ℝ 𝔼 ℂ 𝕋 𝕀 𝕆 ℕ 𝕊",show_lines=True)
table.add_column("Source URL", justify="center", max_width=60)
table.add_column("Source Domain", justify="center")
table.add_column("Destination URL", justify="center", max_width=60)
table.add_column("Destination Domain", justify="center")
table.add_row(self.url, self.get_domain_name(self.url), self.expanded_url, self.get_domain_name(self.expanded_url))
printc(table)
else:
printc('[red3][-][/red3] No redirection found!')
def get_defanged_url(self, url: str) -> str:
url_parts = url.split("/")
scheme = url_parts[0].replace("https:", "hxxps").replace("http:", "hxxp")
authority = self.get_domain_name(url).replace(".", "[.]")
path = url_parts[-1]
defanged_url = scheme + "[://]" + authority + "/" + path
return defanged_url
def check_google_safe_browsing(self) -> None:
# Check Google Safe Browsing
number_of_redirections = len(self.servers) - 1
# Remove the protocol from the target url
target_url = self.expanded_url.replace("https://","").replace("http://","")
if "no such URL in our anti-virus databases" in self.servers[number_of_redirections]['Google Safe Browsing Database']:
print("N/A")
else:
printc(f"[gold1][!][/gold1] [gold1]{target_url}[/gold1]: [red3 b]{self.servers[number_of_redirections]['Google Safe Browsing Database']}[/red3 b]")
def get_domain_name(self, url: str) -> str:
url_parts = url.split('/')
return url_parts[2]
def check_tracking_domain_name(self) -> None:
target_domain_name = self.get_domain_name(self.url)
with open("db/ip_tracking_domains.json") as f:
data = json.load(f)
for ip_tracker_provider,ip_tracking_domain in data.items():
if ip_tracking_domain == target_domain_name:
printc(f"[gold1][!][/gold1] [gold1]{target_domain_name}[/gold1] is an IP tracking domain name own by [gold1]{ip_tracker_provider}[/gold1]!")
break
else:
print("N/A")
def check_url_shortener_domain(self) -> None:
target_domain_name = self.get_domain_name(self.url)
with open('db/url_shortener_domains.db') as f:
url_shortener_domains = f.readlines()
for url_shortener_domain in url_shortener_domains:
if url_shortener_domain[:-1] == target_domain_name:
printc(f"[gold1][!][/gold1] [gold1]{target_domain_name}[/gold1] found in url shortener domains database!")
printc(f"[gold1][!][/gold1] [red3]{self.defanged_url}[/red3] is a [gold1]shortened[/gold1] url!")
break
else:
print("N/A")
def webpage_illustration(self):
if self.target_webpage_screenshot != "":
webpage_screenshot = requests.get(self.target_webpage_screenshot, stream=True)
else:
pagekeeper_url = "https://api.pagepeeker.com/v2/thumbs.php"
params = {"size": "x", "url": self.expanded_url}
webpage_screenshot = requests.get(pagekeeper_url, headers = {"User-Agent": self.get_user_agent(), "Referer": "https://pagepeeker.com/"}, params=params, stream=True)
if webpage_screenshot.status_code == 200:
user_choice = input(f"Would you like to see a real-time screenshot of {self.defanged_url} [Yes/no]: ")
if user_choice.lower() in ['','y', 'yes', 'yep', 'yeah', 'yay']:
with Image.open(webpage_screenshot.raw) as img:
try:
img.show()
except BaseException as e:
printc("[red3][-][/red3] An error occured: screenshot unavailable")
else:
printc("[red3][-][/red3] Screenshot unavailable!!")
def check_virustotal (self, target_url: str, api_key: str, verbosity: bool) -> None:
url = "https://www.virustotal.com/api/v3/urls"
payload = f"url={target_url}"
headers = {
"accept": "application/json",
"x-apikey": api_key,
"content-type": "application/x-www-form-urlencoded"
}
max_wait_time = 60
wait_time = 10
elapsed_time = 0
response = requests.post(url, data=payload, headers=headers)
if response.status_code == 200:
url_scan_link = response.json()['data']['links']['self']
while elapsed_time < max_wait_time:
url_analysis_report = requests.get(url_scan_link, headers=headers)
if url_analysis_report.status_code == 200:
url_analysis_report_json = url_analysis_report.json()
url_analysis_report_id = url_analysis_report_json['meta']['url_info']['id']
total_number_of_vendors = len(url_analysis_report_json['data']['attributes']['results'].keys())
url_report_gui = "https://www.virustotal.com/gui/url/" + url_analysis_report_id
url_scan_stats = url_analysis_report_json['data']['attributes']['stats']
malicious_stats = url_scan_stats['malicious']
results = url_analysis_report_json['data']['attributes']['results']
if total_number_of_vendors > 0:
if malicious_stats > 0:
printc(f"[gold1][!][/gold1] [red3]{malicious_stats} security vendors flagged this URL as malicious[/red3]")
else:
printc(f"[spring_green2][+][/spring_green2] No security vendors flagged this URL as malicious")
printc(f"[spring_green2][+][/spring_green2] Security vendors' analysis\n{'-'*32}")
if verbosity > 0:
for stat, stat_value in url_scan_stats.items():
printc(f"[gold1][!][/gold1] {stat}: {stat_value}/{total_number_of_vendors}")
if malicious_stats > 0:
table = Table(title="𝔻 𝔼 𝕋 𝔸 𝕀 𝕃 𝕊", show_lines=True)
table.add_column("VENDOR", justify="center", max_width=60)
table.add_column("RESULT", justify="center", )
table.add_column("METHOD", justify="center")
for key,value in results.items():
if value['category'] == "malicious":
table.add_row(key, value['result'], value['method'])
printc(table)
else:
for stat,stat_value in url_scan_stats.items():
printc(f"[gold1][!][/gold1] {stat}: {stat_value}/{total_number_of_vendors}")
printc(f"[spring_green2][+][/spring_green2] For more information, you can check the link below ↓")
printc(f"[spring_green2][+][/spring_green2] {url_report_gui}")
break
else:
printc(f"[gold1][!][/gold1] Scan still in progress. Waiting for {wait_time} seconds...")
time.sleep(wait_time)
elapsed_time += wait_time
wait_time = 5
else:
printc(f"[red3][-][/red3] {url_analysis_report.text}")
else:
printc(f"[red3][-][/red3] {response.text}")
def check_urlscan_io(self, target_url: str, api_key: str, verbosity: bool) -> None:
max_wait_time = 120 # 2 minutes
wait_time = 10 # initial wait time
elapsed_time = 0
headers = {'API-Key': api_key, 'Content-Type':'application/json'}
data = {"url": target_url, "visibility": "unlisted"}
response = requests.post('https://urlscan.io/api/v1/scan/', headers=headers, data=json.dumps(data))
if response.status_code == 200:
response_json = response.json()
result_api_url = response_json['api']
while elapsed_time < max_wait_time:
response_api_url = requests.get(result_api_url)
if response_api_url.status_code == 200:
self.target_webpage_screenshot = response_api_url.json()['task']['screenshotURL']
verdict_overall = response_api_url.json()['verdicts']['overall']
verdict_urlscan = response_api_url.json()['verdicts']['urlscan']
if verdict_overall['score'] > 0:
printc(f"\n[spring_green2][+][/spring_green2] Verdict overall\n{'-'*20}")
printc(f"[spring_green2][+][/spring_green2] Time: {response_api_url.json()['task']['time']}")
for verdict_overall_property,verdict_overall_value in verdict_overall.items():
if isinstance(verdict_overall_value, list):
printc(f"[gold1][!][/gold1] {verdict_overall_property}: {verdict_overall_value[0]}")
else:
printc(f"[gold1][!][/gold1] {verdict_overall_property}: {verdict_overall_value}")
if verbosity:
printc(f"\n[spring_green2][+][/spring_green2] Verdict urlscan\n{'-'*20}")
for verdict_urlscan_property,verdict_urlscan_value in verdict_urlscan.items():
if isinstance(verdict_urlscan_value, list):
if verdict_urlscan_property == 'brands':
for brand_key,brand_value in verdict_urlscan_value[0].items():
if brand_value != "":
printc(f"[gold1][!][/gold1] Brand {brand_key}: {brand_value}")
else:
printc(f"[red3][-][/red3] Brand {brand_key}: N/A")
else:
if verdict_urlscan_property in ['score', 'malicious']:
printc(f"[gold1][!][/gold1] {verdict_urlscan_property}: {verdict_urlscan_value}")
printc(f"[gold1][!][/gold1] For more information about the report you can check the link below ↓")
printc(f"[spring_green2][+][/spring_green2] {response_api_url.json()['task']['reportURL']}")
else:
printc(f"\n[gold1][!][/gold1] Verdict urlscan\n{'-'*20}")
printc(f"[gold1][!][/gold1] Score: {verdict_urlscan['score']}")
printc(f"[gold1][!][/gold1] Malicious: {verdict_urlscan['malicious']}")
printc(f"\n[gold1][!][/gold1] Verdict Overall\n{'-'*20}")
printc(f"[gold1][!][/gold1] Score: {verdict_overall['score']}")
printc(f"[gold1][!][/gold1] Malicious: {verdict_overall['malicious']}")
printc(f"[spring_green2][+][/spring_green2] For more information about the report you can check the link below ↓")
printc(f"[spring_green2][+][/spring_green2] {response_api_url.json()['task']['reportURL']}")
break
elif response_api_url.status_code == 404:
printc(f"[gold1][!][/gold1] Scan still in progress. Waiting for {wait_time} seconds...")
time.sleep(wait_time)
elapsed_time += wait_time
wait_time = 5
else:
printc("[red3][-][/red3] Unexpected HTTP response code ({response_api_url.status_code}) returned!!")
elif response.status_code == 400:
printc(f"[red3][-][/red3] {response.json()['message']}")
elif response.status_code == 429:
printc(f"[red3][!][/red3] urlscan.io rate-limit exceeded!")
printc(f"[gold1][!][/gold1] You can find more information here: https://urlscan.io/docs/api/#ratelimit")
else:
printc(f"[red3][-][/red3] {response.text}")
printc(f"[gold1][!][/gold1] Thanks to report this issue at https://github.com/0liverFlow/HookPhish/issues")
def check_abuse_ip_db(self, ip_address: str, api_key: str, verbosity: bool) -> None:
url = 'https://api.abuseipdb.com/api/v2/check'
querystring = {
'ipAddress': ip_address,
'maxAgeInDays': '365'
}
headers = {
'Accept': 'application/json',
'Key': api_key
}
response = requests.request(method='GET', url=url, headers=headers, params=querystring)
if response.ok:
# Formatted output
decodedResponse = json.loads(response.text)
ip_info_dict = dict(decodedResponse['data'])
if ip_info_dict['totalReports']:
printc(f"[gold1][!][/gold1] [gold1]{ip_address}[/gold1] was found in Abuse IP DB!")
printc(f"[gold1][!][/gold1] This IP was reported [gold1]{ip_info_dict['totalReports']}[/gold1] times by [gold1]{ip_info_dict['numDistinctUsers']}[/gold1] distinct users.")
printc(f"[gold1][!][/gold1] Confidence of Abuse is [gold1]{ip_info_dict['abuseConfidenceScore']}[/gold1]")
if verbosity:
for property in sorted(ip_info_dict.keys()):
if property not in ['abuseConfidenceScore', 'numDistinctUsers', 'totalReports']:
printc(f"[spring_green2][+][/spring_green2] {property}: {ip_info_dict[property]}")
else:
for property in sorted(ip_info_dict.keys()):
if property in ['isp', 'isTor', 'isWhiteListed', 'usageType', 'lastReportedAt']:
printc(f"[spring_green2][+][/spring_green2] {property}: {ip_info_dict[property]}")
else:
printc(f"N/A")
elif response.status_code == 401:
printc(f"[red3][-][/red3] {response.json()['errors'][0]['detail']}")
printc(f"[gold1][!][/gold1] Thanks to read the documentation: https://github.com/0liverFlow/HookPhish/blob/main/README.md")
else:
printc(f"[red3][-][/red3] {response.text}")
printc(f"[gold1][!][/gold1] Thanks to report this issue at https://github.com/0liverFlow/HookPhish/issues")