forked from sfbaker7/lightweight_tor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrypt.py
151 lines (136 loc) · 4.49 KB
/
crypt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import base64
import re
import cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
import ast
def gen_aes_key():
'''
Generates an AES key using the Fernet recipe layer
:rtype: A URL-safe base64-encoded 32-byte key
'''
key = Fernet.generate_key()
return key
def encrypt_aes(key, plaintext_bytes):
'''
Encrypts message using AES
:param bytes key: AES Fernet key
:param bytes message: the message in bytes meant to be encrypted
:rtype: bytes
'''
token = Fernet(key).encrypt(plaintext_bytes)
return token
def decrypt_aes(key, token):
'''
:rtype: bytes
'''
plaintext_bytes = Fernet(key).decrypt(token)
return plaintext_bytes
def gen_rsa_keypair():
'''
:rtype: keypair objects
'''
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend()
)
public_key = private_key.public_key()
return private_key, public_key
def encrypt_rsa(public_key, message):
'''
:rtype: str
'''
ciphertext = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def decrypt_rsa(private_key, ciphertext):
'''
Decode ciphertext using RSA private key
:param: bytes/rsa_object private_key
:param: bytes/string ciphertext
'''
if not isinstance(ciphertext, bytes):
raise Exception('Ciphertext should be of byte format, not ' , type(ciphertext))
if not isinstance(private_key, rsa.RSAPrivateKey):
private_key = load_private_pem(private_key)
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext
def get_pem_format(private_key, public_key):
'''
:ptype: private_key object, pubic_key object
:rtype: private_key str, pubic_key str
'''
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
def load_private_pem(private_key_pem):
'''
Converts private_key.pem format to private_key object
'''
private_key = serialization.load_pem_private_key(
private_key_pem,
password=None,
backend=default_backend()
)
return private_key
def encrypt(AES_key, public_key_pem, payload):
'''
aes_key_encrypt(payload) + rsa_encrypt(aes_key)
'''
public_key = serialization.load_pem_public_key(public_key_pem, backend=default_backend())
encrypted_payload = encrypt_aes(AES_key, payload)
encrypted_aes_key = encrypt_rsa(public_key, AES_key)
return encrypted_aes_key, encrypted_payload
def decrypt():
return
def decrypt_payload(AES_key, payload):
'''
decrypt payload, try to match for valid url, else next relay node
rtype: string destination_url, empty string
rtype: string relay_node_ip, next layer of encrypted message
'''
decrypted_payload = (decrypt_aes(AES_key, payload)).decode('UTF8')
ip_addr_match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', decrypted_payload)
url_match = re.search(r'^((https?|ftp|smtp):\/\/)?(www.)?[a-z0-9]+\.[a-z]+(\/[a-zA-Z0-9#]+\/?)*$', decrypted_payload)
localhost_match = re.search(r'localhost:\d{4}', decrypted_payload)
destination = ''
message = ''
# print(decrypted_payload)
if url_match is not None:
destination = url_match.group()
message = ''
elif localhost_match is not None:
destination = localhost_match.group()
message = decrypted_payload.replace(destination,'')
elif ip_addr_match is not None:
destination = ip_addr_match.group()
message = decrypted_payload.replace(destination,'')
else:
raise Exception('No match was found')
return destination, message