-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqure.py
285 lines (225 loc) · 9.19 KB
/
qure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
Filename: qure.py
Author: Ziyu Ye
A Python package for a Post-Quantum safe future of Web3.
"""
import os
import oqs
import time
import json
import base64
import binascii
import argparse
from web3 import Web3
from pathlib import Path
from pprint import pprint
from getpass import getpass
from collections import OrderedDict
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
def bytes_to_str(text_bytes):
"""
Transform a bytes in string format.
Input: text (bytes)
Return: string format of the bytes
"""
return binascii.hexlify(text_bytes).decode('utf-8')
def str_to_bytes(text_str):
"""
Transform a string in bytes format.
Input: string (bytes)
Return: bytes format of the bytes
"""
return binascii.unhexlify(text_str.encode('utf-8'))
def key_transform(key: bytes=b'\xfe',
salt: bytes=b'5\xa3'):
"""
This function transforms an KEM key to the format
of 32 url-safe base64-encoded bytes.
Input:
key (bytes in binary format): the key from KEM
salt (bytes): a string generated by os.urandom(16)
Return:
key_ (bytes in base64 format): the key used for SKE
Source: cryptography.io/en/latest/
fernet/#using-passwords-with-fernet
"""
password = binascii.hexlify(key)
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000)
key_ = base64.urlsafe_b64encode(kdf.derive(password))
return key_
def hybrid_encryption(message: str='8964',
kem_algo: str='Kyber512'):
"""
This function encrypts the message using KEM (Key Encryption
Mechanism) and SKE (Symmetric Key Encryption).
Inputs:
message (str): the private key used on blockchain transaction
kem_algo (str): the KEM algorithm
Return:
See below.
"""
# Initialize KEM (sender)
sender = oqs.KeyEncapsulation(kem_algo)
# Sender generates the key pair
public_key = sender.generate_keypair()
secret_key = sender.export_secret_key()
# Sender generates ciphertext and key
ciphertext, key = sender.encap_secret(public_key)
# Transform the key to a symmetric key (p.s. info may lose here)
salt = os.urandom(16)
key_ = key_transform(key, salt)
# Encrypt the private key used on blockchain by the transformed key
fernet = Fernet(key_)
message_encrypted = fernet.encrypt(message.encode())
# Notice that the key is never saved
return ciphertext, public_key, secret_key, salt, message_encrypted
def hybrid_decryption(kem_algo: str='Kyber512',
secret_key: bytes=b'\x13"l',
ciphertext: bytes=b'\xbb',
salt: bytes=b'u\xff',
message_encrypted: bytes=b'8964'):
"""
This function decrypts the message encrypted using KEM
(Key Encryption Mechanism) and SKE (Symmetric Key Encryption).
Inputs:
See the above.
Return:
The private key (string with length of 64).
"""
receiver = oqs.KeyEncapsulation(kem_algo, secret_key)
fernet = Fernet(key_transform(receiver.decap_secret(ciphertext), salt))
return fernet.decrypt(message_encrypted).decode()
def safebox(user_id: str='leviathan',
message: str='217d5c81a38240e09ee05bc4ab3efce2eb91d24973162f9833ed18aa7b4460b9',
kem_algo: str='Kyber512',
root: str='./database'):
# Get timing
start_time = time.time()
# Run hybrid encryption
mistery = hybrid_encryption(message, kem_algo)
# Check the directory for user
user_dir = Path(root) / f'{user_id}'
if not os.path.exists(user_dir):
os.makedirs(user_dir)
# Assign the key id for the user
key_id = len(os.listdir(user_dir))
# The user dict saved in our database
user_dict = {'user_id': user_id,
'key_id': key_id,
'encryption_algo': kem_algo,
'ciphertext_abs': bytes_to_str(mistery[0])[:32],
'ciphertext': bytes_to_str(mistery[0]),
'public_key': bytes_to_str(mistery[1]),
'secret_key': bytes_to_str(mistery[2]),
'salt': bytes_to_str(mistery[3]),
'message_encrypted': bytes_to_str(mistery[4])}
# The user dict saved by the user
user_dict_local = {i: user_dict[i] for i in
['user_id', 'key_id', 'encryption_algo', 'ciphertext_abs']}
# Order the dict
user_dict = OrderedDict(user_dict)
user_dict_local = OrderedDict(user_dict_local)
# Create the file objects
file = open(user_dir / f'{key_id}.json', 'w')
file_local = open(user_dir / f'{key_id}_local.json', 'w')
# Dump the data into the file
json.dump(user_dict, file, indent=4)
json.dump(user_dict_local, file_local, indent=4)
file.close()
file_local.close()
# Leave user a message
print('\n🔐 Your private key are now encrypted '
'by our post quantum-safe cryptography technology!')
# Print the must hold info for the user
print('\n============= 🖍 Your info for this safebox =============')
for k, v in user_dict_local.items():
print(f'{[k]} {v}')
print('=========================================================')
# Get time
print(f'\nTime elapsed: {time.time() - start_time:.3f}s.')
return None
def safe_transaction(user_id: str='leviathan',
key_id: str='0',
ciphertext_abs: str='7f9da539c7ca9cbab104cb2a76d93be0',
account_from: str='0xA5b372E60a60A70f2c6ACB87710eba30Ecc4D479',
account_to: str='0xA059250F4b97bbB3C081f6D9e1fC7249c6Ea2A0c',
value: float=0.0001,
url: str='https://eth-goerli.g.alchemy.com/v2/fss_yq1JH8COJapGjbQCuaCD77JrjQRp',
chain_id: int=5,
gas: int=21000,
gas_price: int=200000000,
root: str='./database'):
# ============================================
# 1. Setting Up Web3
# ============================================
# Connecting to Web3
web3 = Web3(Web3.HTTPProvider(url))
if web3.isConnected():
print(f'\n🚀 You are now connected to Web3!'
f'\nCurrent block number on the chain is {web3.eth.blockNumber}.')
# Setting up the account
print('\n🛰 Validating account addresses...')
account_from = Web3.toChecksumAddress(account_from)
account_to = Web3.toChecksumAddress(account_to)
print('Account addresses are valid.')
# Get the nounce of the payer account
nonce = web3.eth.getTransactionCount(account_from)
# Get the tx for the transaction
# gas_price = web3.eth.gasPrice
# gas = web3.eth.estimateGas({'from': account_from, 'to': account_to})
tx = {'nonce': nonce,
'to': account_to,
'value': web3.toWei(value, 'ether'),
'gas': gas,
'gasPrice': gas_price,
'chainId': chain_id}
# ============================================
# 2. Verify the user records for decryption
# ============================================
# Get timing
start_time = time.time()
# Check the directory for user
print('\n🛸 Verifying your directory in SeQure safebox...')
user_dir = Path(root) / f'{user_id}'
if not os.path.exists(user_dir):
print('No directory found for this user. Force quitting now.')
sys.exit()
file = open(user_dir / f'{key_id}.json')
user_dict = json.load(file)
file.close()
# Verify the ciphertext abstract
if not user_dict['ciphertext_abs'] == ciphertext_abs:
print('Ciphertext not matched for this info. Force quitting now.')
sys.exit()
# Load the data from the user file
print('Ciphertext abstract matched!')
kem_algo = user_dict['encryption_algo']
secret_key = str_to_bytes(user_dict['secret_key'])
ciphertext = str_to_bytes(user_dict['ciphertext'])
salt = str_to_bytes(user_dict['salt'])
message_encrypted = str_to_bytes(user_dict['message_encrypted'])
# ============================================
# 3. Make the transaction
# ============================================
# Sign the transaction
print('\n👨🚀 Your transaction is now SeQurely protected by our post quantum-safe cryptography technology...')
print('Decrypting...')
signed_tx = web3.eth.account.signTransaction(tx,
hybrid_decryption(kem_algo,
secret_key,
ciphertext,
salt,
message_encrypted))
# Hash the signature
tx_hash = web3.eth.sendRawTransaction(signed_tx.rawTransaction)
# Print the message
print(f'Your transaction ID: {web3.toHex(tx_hash)}.')
print('The transaction is successful.')
# Get time
print(f'\nTime elapsed: {time.time() - start_time:.3f}s.')
return None