-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackend.py
164 lines (142 loc) · 6.48 KB
/
backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import json
import random
import secrets
from flask import Flask, render_template, request, abort, redirect, url_for
import requests
from datetime import datetime
from Cryptodome.Hash import SHA256
import os
app = Flask(__name__, template_folder='./frontend/templates', static_folder='./frontend/static')
# ngrok http https://localhost:5002
domain_of_passgate_api = "https://168a-2001-569-79ed-f100-88b7-5f8b-c54f-6e82.ngrok.io" + '/'
passgate_api_reqcode_url = "requestcode"
passgate_api_reqsms = "requestsms"
passgate_api_authtoken = "5iv3TYphzQu-ZEoWgpMaGp7RRHXeEWsQzc7A9h2RKL4"
auth_header = {'Authorization': 'Bearer ' + passgate_api_authtoken}
tokenDataMap = {}
def authorize(uname, p):
return True
def get_time():
return datetime.now().strftime("%H:%M:%S,%f")
@app.route('/', methods=['GET'])
def homepage_login():
return render_template('index.html')
@app.route('/login', methods=['POST'])
def login():
got_req = get_time()
log = {'START_user_pressed_login': datetime.now()}
pn = str(request.form['phone'])
h = SHA256.new()
h.update(pn.encode('utf-8'))
log['hashed_phone'] = h.hexdigest()
if authorize(pn, str(request.form['password'])):
requestedTimeout = 50
requestRecieve = 'Receive' in request.form.keys() and str(request.form['Receive']) == 'on'
if not requestRecieve:
log['call'] = 'true'
payload = {'phone': pn, 'to': str(requestedTimeout)}
log['sent_passgate_info_request'] = get_time()
result = requests.get(domain_of_passgate_api + passgate_api_reqcode_url, params=payload,
headers=auth_header)
log['received_passgate_info_answer'] = get_time()
json_answer = result.json()
code = int(json_answer['code'])
timeout = float(json_answer['timeout']) # in seconds
response_at = str(json_answer['response_at'])
generatedToken = str(secrets.token_urlsafe(32))
while generatedToken in tokenDataMap.keys():
generatedToken = str(secrets.token_urlsafe(32))
tokenDataMap.update({generatedToken: (code, timeout, response_at, log)})
ret = render_template('2fa.html', digit1=int((code / 10) % 10), digit2=(code % 10), phone_number=pn,
timeout=timeout, token=generatedToken)
return ret
else:
log['call'] = 'false'
payload = {'phone': pn}
generatedToken = str(random.randint(0, 100)) # used only for state keeping/logging ; === request uid
while generatedToken in tokenDataMap.keys():
generatedToken = str(random.randint(0, 100))
log['sent_passgate_info_request'] = get_time()
result = requests.get(domain_of_passgate_api + passgate_api_reqsms, params=payload, headers=auth_header)
log['received_passgate_info_answer'] = get_time()
tokenDataMap.update({generatedToken: ('SMS->unused', -1, 'SMS->unused', log)})
return render_template('SMS.html', response_at='auth/' + str(result.json()['token']) + '/SMS',
token=generatedToken)
else:
return render_template(
'index.html') # could also change to show error, but doesn't matter for our PoC, since we don't hold actual users DB
def save_log(log):
filename = 'logs/' + log['hashed_phone'][:6] + '/' + datetime.now().strftime("%Y-%m-%dT%H_%M_%S")
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as out_file:
js_dump = json.dumps(log)
out_file.write(js_dump)
@app.route('/auth_check/<token>')
def verify_auth(token):
req_time = get_time()
val = tokenDataMap.get(token)
if val is None:
# return error page
abort(404)
(code, timeout, response_at, log) = val
log['CALLONLY_user_page_loaded'] = req_time
log['CALLONLY_sending_passgate_call_request'] = get_time()
result = requests.get(domain_of_passgate_api + response_at, headers=auth_header)
log['END_received_passgate_answer'] = datetime.now()
json_answer = result.json() # json.loads('{"authorized":true}')
auth = json_answer['authorized']
log['success'] = auth
start_time = log['START_user_pressed_login']
log['START_user_pressed_login'] = start_time.strftime("%H:%M:%S,%f")
end_time = log['END_received_passgate_answer']
log['END_received_passgate_answer'] = end_time.strftime("%H:%M:%S,%f")
total_time = str(end_time - start_time)
log['TOTAL_TIME'] = total_time
save_log(log)
if auth:
# success, continue with authentication
# remove token from map
del tokenDataMap[token]
return render_template('success.html', success='true',tt=total_time)
else:
# failed auth
# TODO: decide if we directly remove token from map - depending if that's what our API will do
return render_template('success.html', success='false',tt=total_time)
@app.route('/verify_code', methods=['POST'])
def check_code():
rec_time = get_time()
value = tokenDataMap[request.form['token']]
assert value is not None
(a, b, c, log) = value
log['received_input'] = rec_time
input_code = request.form['code'] # crashes if 'code' not present
response_at = request.form['response_at']
log['SMSONLY_sending_passgate_verification_request'] = get_time()
result = requests.get(domain_of_passgate_api + response_at + '?code=' + str(input_code), headers=auth_header)
log['END_received_passgate_answer'] = datetime.now()
json_answer = result.json()
auth = json_answer['authorized']
log['success'] = auth
start_time = log['START_user_pressed_login']
log['START_user_pressed_login'] = start_time.strftime("%H:%M:%S,%f")
end_time = log['END_received_passgate_answer']
log['END_received_passgate_answer'] = end_time.strftime("%H:%M:%S,%f")
total_time = str(end_time - start_time)
log['TOTAL_TIME'] = total_time
save_log(log)
if auth:
return render_template('success.html', success='true',tt=total_time)
else:
# failed auth
return render_template('success.html', success='false',tt=total_time)
@app.route('/cancel')
def cancel_req():
value = tokenDataMap[request.form['token']]
print("Cancelled, token value: "+value)
assert value is not None
(a, b, c, log) = value
log['CANCEL_END'] = get_time()
save_log(log)
return redirect(url_for('homepage_login'))
if __name__ == '__main__':
app.run(threaded=True)