+ Manage {{ client.get("name") }}
+
+ {%- if saving %}
+
+ {%- elif manager_type %}
+
+ {%- else %}
+
+ {%- endif %}
+
+
+{% endblock %}
diff --git a/wsgi.py b/wsgi.py
index b71d93b..f3c992b 100644
--- a/wsgi.py
+++ b/wsgi.py
@@ -11,6 +11,7 @@
import jwt_signing
import sso_oidc
import werkzeug
+import secrets
from datetime import timedelta
from flask import (
@@ -42,6 +43,7 @@
CheckCSRFSession,
UserShouldBeSignedIn,
)
+from sso_oidc_client_generation import generate_client_auth_pair
ENVIRONMENT = env_var("ENVIRONMENT", "development")
jprint("Starting wsgi.py - ENVIRONMENT:", ENVIRONMENT)
@@ -50,9 +52,10 @@
IS_PROD = ENVIRONMENT.lower().startswith("prod")
DEBUG = not IS_PROD
-IS_ADMIN = env_var("IS_ADMIN", "f", return_bool=True)
IS_HTTPS = env_var("IS_HTTPS", "f", return_bool=True)
+SUPERUSERS = env_var("SUPERUSERS", "")
+
COOKIE_PREFIX = "__Host-" if IS_HTTPS else ""
COOKIE_NAME_SESSION = f"{COOKIE_PREFIX}Session-SSO"
COOKIE_NAME_BROWSER = f"{COOKIE_PREFIX}Browser-SSO"
@@ -271,9 +274,11 @@ def get_request_vals(
)
return {
- k: (res[k][0] if len(res[k]) == 1 else res[k])
- if type(res[k]) == list
- else res[k]
+ k: (
+ (res[k][0] if len(res[k]) == 1 else res[k])
+ if type(res[k]) == list
+ else res[k]
+ )
for k in res
}
@@ -723,14 +728,16 @@ def microsoft_callback():
jprint(
{
"sub": session["sub"],
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": client_ip(),
"client_country": session["init_country"],
- "microsoft_auth_ip": id_token["ipaddr"]
- if "ipaddr" in id_token
- else None,
+ "microsoft_auth_ip": (
+ id_token["ipaddr"] if "ipaddr" in id_token else None
+ ),
"action": "microsoft-sign-in-successful",
"browser_cookie_value": browser_cookie_value,
}
@@ -868,9 +875,11 @@ def google_callback():
jprint(
{
"sub": session["sub"],
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": client_ip(),
"client_country": session["init_country"],
"action": "google-sign-in-successful",
@@ -889,7 +898,7 @@ def google_callback():
@app.route("/auth/oidc", methods=["GET", "POST"])
@SetBrowserCookie
def auth_oidc():
- client = {"ok": False}
+ client = {"_ok": False}
# ==
# get tmp_client_id
@@ -908,7 +917,7 @@ def auth_oidc():
tmp_implicit_jwt = False
if tmp_client_id is not None:
client = sso_oidc.get_client(tmp_client_id)
- if not client["ok"]:
+ if not client["_ok"]:
return redirect("/error?type=client-id-unknown")
session["oidc_client_id"] = tmp_client_id
if "implicit_jwt" in client:
@@ -948,7 +957,7 @@ def auth_oidc():
# get tmp_response_mode
# ==
tmp_response_mode = None
- if client["ok"] and "response_mode" in client and client["response_mode"]:
+ if client["_ok"] and "response_mode" in client and client["response_mode"]:
tmp_response_mode = client["response_mode"]
else:
if "response_mode" in request.args:
@@ -972,7 +981,7 @@ def auth_oidc():
raw_redirect_url = None
redirect_url_attribute = "redirect_uri"
if (
- client["ok"]
+ client["_ok"]
and "redirect_uri_override" in client
and client["redirect_uri_override"]
):
@@ -992,7 +1001,7 @@ def auth_oidc():
session.pop("oidc_redirect_uri", None)
tmp_redirect_url = None
- if client["ok"]:
+ if client["_ok"]:
session["oidc_redirect_client"] = True
if "redirect_url_override" in client and client["redirect_url_override"]:
urlor = client["redirect_url_override"]
@@ -1029,7 +1038,7 @@ def auth_oidc():
# get tmp_scope
# ==
tmp_scope = []
- if client["ok"] and "scope_override" in client and client["scope_override"]:
+ if client["_ok"] and "scope_override" in client and client["scope_override"]:
tmp_scope = client["scope_override"]
else:
if "scope" in request.args:
@@ -1202,7 +1211,7 @@ def signout(country_missmatch: bool = False):
if "from_app" in request.args:
page_params = {}
client = sso_oidc.get_client(request.args.get("from_app", None))
- if client["ok"]:
+ if client["_ok"]:
if "name" in client:
page_params.update({"from_app_name": client["name"]})
if "app_url" in client:
@@ -1231,12 +1240,151 @@ def signout(country_missmatch: bool = False):
)
if to_client:
client = sso_oidc.get_client(to_client)
- if client["ok"] and "app_url" in client:
+ if client["_ok"] and "app_url" in client:
redirect_url = client["app_url"]
return redirect(redirect_url)
+@app.route("/manage", methods=["GET", "POST"])
+@UserShouldBeSignedIn
+@SetBrowserCookie
+def route_manage():
+ cc = country_check()
+ if cc:
+ return cc
+
+ if "csrf_token" not in session:
+ session["csrf_token"] = secrets.token_urlsafe(24)
+
+ saving = request.method == "POST"
+ reset_secret = False
+
+ if saving:
+ posted_csrf_token = request.form.get("csrf_token", None)
+ if not posted_csrf_token:
+ return redirect("/dashboard?error=no-csrf-token")
+
+ session_csrf_token = session.get("csrf_token", None)
+ if not session_csrf_token:
+ return redirect("/dashboard?error=missing-csrf-token")
+
+ if session_csrf_token != posted_csrf_token:
+ return redirect("/dashboard?error=bad-csrf-token")
+
+ reset_secret = request.form.get("reset_secret", None)
+
+ client_id = None
+ client = {}
+
+ raw_client_id = request.args.get("client_id", None)
+ if not raw_client_id:
+ raw_client_id = request.form.get("client_id", None)
+
+ if raw_client_id:
+ client = dict.copy(sso_oidc.get_client(client_id=raw_client_id))
+ if client and client.get("_ok", False):
+ client_id = raw_client_id
+
+ if not client_id:
+ jprint(
+ {"path": "/manage", "error": "Bad/missing client ID", "user": users_email}
+ )
+ return redirect("/dashboard?error=invalid-client-id")
+
+ owners = client.get("owners", [])
+ managers = client.get("managers", [])
+
+ manager_type = None
+
+ users_email = session.get("email", {}).get("email", None)
+
+ if users_email and users_email in SUPERUSERS:
+ manager_type = "owner"
+ elif users_email and users_email in owners:
+ manager_type = "owner"
+ elif users_email and users_email in managers:
+ manager_type = "manager"
+
+ if not manager_type:
+ jprint(
+ {
+ "path": "/manage",
+ "error": "Access denied",
+ "client_id": client_id,
+ "user": users_email,
+ }
+ )
+ return redirect("/dashboard?error=management-no-access")
+
+ client_json = None
+ client_json_lines = 0
+
+ if not saving:
+ for client_item in list(client.keys()):
+ if client_item.startswith("_"):
+ client.pop(client_item, None)
+
+ client.pop("client_id", None)
+ if client.pop("secret", None):
+ client["secret"] = "REDACTED"
+
+ client_json = json.dumps(client, default=str, indent=2)
+ client_json_lines = len(client_json.split("\n"))
+
+ save_success = False
+
+ new_secret = None
+
+ if saving:
+ try:
+ raw_config = request.form.get("config", None)
+ new_config = json.loads(raw_config)
+ if new_config:
+ if manager_type == "owner" and reset_secret == "reset_secret":
+ new_secret = generate_client_auth_pair().get("client_secret")
+ new_config["secret"] = new_secret
+ else:
+ new_config["secret"] = client["secret"]
+
+ if manager_type == "manager":
+ new_config["owners"] = client.get("owners", [])
+ new_config["managers"] = client.get("managers", [])
+
+ save_success = sso_oidc.save_client(
+ filename=client.get("_filename", None),
+ client=new_config,
+ client_id=client_id,
+ )
+ except Exception as err:
+ jprint(
+ {
+ "path": "/manage",
+ "error": err,
+ "client_id": client_id,
+ "user": users_email,
+ }
+ )
+
+ return renderTemplate(
+ "manage.html",
+ {
+ "csrf_token": session.get("csrf_token", None),
+ "session": session,
+ "client_id": client_id,
+ "manager_type": manager_type,
+ "new_secret": new_secret,
+ "client": client,
+ "client_json": client_json,
+ "client_json_lines": client_json_lines,
+ "saving": saving,
+ "save_success": save_success,
+ "title": "Manage",
+ "nav_item": "manage",
+ },
+ )
+
+
@app.route("/dashboard", methods=["GET"])
@UserShouldBeSignedIn
@SetBrowserCookie
@@ -1245,51 +1393,49 @@ def dashboard():
if cc:
return cc
+ users_email = session.get("email", {}).get("email", None)
+
allowed_apps = {}
all_clients = sso_oidc.get_clients()
- for client in all_clients:
- ve = valid_email(session["email"]["email"], all_clients[client], debug=DEBUG)
+ for client_id in all_clients:
+ client = all_clients[client_id]
+ ve = valid_email(users_email, client, debug=DEBUG)
if ve["valid"]:
- name = (
- all_clients[client]["name"] if "name" in all_clients[client] else None
- )
+ name = client["name"] if "name" in client else None
button_text = (
- all_clients[client]["dashboard_button_override"]
- if "dashboard_button_override" in all_clients[client]
+ client["dashboard_button_override"]
+ if "dashboard_button_override" in client
else f"Open {name}"
)
- description = (
- all_clients[client]["description"]
- if "description" in all_clients[client]
- else None
+ can_manage = (
+ True
+ if (
+ users_email
+ in (client.get("owners", []) + client.get("managers", []))
+ or users_email in SUPERUSERS
+ )
+ else False
)
- app_url = (
- all_clients[client]["app_url"]
- if "app_url" in all_clients[client]
- else None
- )
+ description = client["description"] if "description" in client else None
- sign_in_url = (
- all_clients[client]["sign_in_url"]
- if "sign_in_url" in all_clients[client]
- else app_url
- )
+ app_url = client["app_url"] if "app_url" in client else None
+
+ sign_in_url = client["sign_in_url"] if "sign_in_url" in client else app_url
dashboard_display = (
- all_clients[client]["dashboard_display"]
- if "dashboard_display" in all_clients[client]
- else True
+ client["dashboard_display"] if "dashboard_display" in client else True
)
- allowed_apps[client] = {
+ allowed_apps[client_id] = {
"name": name,
"description": description,
"sign_in_url": sign_in_url,
"button_text": button_text,
"dashboard_display": dashboard_display,
+ "can_manage": can_manage,
}
return renderTemplate(
@@ -1297,9 +1443,9 @@ def dashboard():
{
"session": session,
"allowed_apps": allowed_apps,
+ "users_email": users_email,
"title": "Dashboard",
"nav_item": "dashboard",
- "IS_ADMIN": IS_ADMIN,
},
)
@@ -1312,7 +1458,6 @@ def help():
"session": session,
"title": "Help",
"nav_item": "help",
- "IS_ADMIN": IS_ADMIN,
},
)
@@ -1325,7 +1470,6 @@ def terms():
"session": session,
"title": "Terms of Service",
"nav_item": "terms",
- "IS_ADMIN": IS_ADMIN,
},
)
@@ -1338,7 +1482,6 @@ def privacy_notice():
"session": session,
"title": "Privacy Notice",
"nav_item": "privacy-notice",
- "IS_ADMIN": IS_ADMIN,
},
)
@@ -1353,7 +1496,6 @@ def profile_saved():
"session": session,
"title": "Profile Saved",
"nav_item": "profile",
- "IS_ADMIN": IS_ADMIN,
},
)
@@ -1427,7 +1569,6 @@ def profile():
"sms_number": sms_number,
"title": "Profile",
"nav_item": "profile",
- "IS_ADMIN": IS_ADMIN,
},
)
@@ -1440,9 +1581,11 @@ def country_check():
jprint(
{
"sub": session["sub"],
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": client_ip(),
"client_country": country,
"action": "country-missmatch-signout",
@@ -1648,15 +1791,21 @@ def signin():
template_id=env_var("NOTIFY_EMAIL_AUTH_CODE_TEMPLATE"),
personalisation={
"auth_code": pretty_code,
- "display_name": user_attributes["display_name"]
- if "display_name" in user_attributes
- and user_attributes["display_name"]
- else session["email"]["email"],
- "obfuscated_ip ": (".".join(c_ip.split(".")[:-1]) + ".*")
- if "." in c_ip
- else (":".join(c_ip.split(":")[:-1]) + ":*")
- if ":" in c_ip
- else "Unknown",
+ "display_name": (
+ user_attributes["display_name"]
+ if "display_name" in user_attributes
+ and user_attributes["display_name"]
+ else session["email"]["email"]
+ ),
+ "obfuscated_ip ": (
+ (".".join(c_ip.split(".")[:-1]) + ".*")
+ if "." in c_ip
+ else (
+ (":".join(c_ip.split(":")[:-1]) + ":*")
+ if ":" in c_ip
+ else "Unknown"
+ )
+ ),
"country": country,
"domain": DOMAIN,
"browser_guess": guess_browser(
@@ -1678,9 +1827,11 @@ def signin():
jprint(
{
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": c_ip,
"client_country": country,
"action": "sign-in-request-email",
@@ -1714,9 +1865,11 @@ def signin():
if code != session["sms-sign-in-code"]:
jprint(
{
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": client_ip(),
"client_country": country,
"action": "code-fail",
@@ -1744,9 +1897,11 @@ def signin():
{
"code": code,
"email-sign-in-code": session["email-sign-in-code"],
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": client_ip(),
"client_country": country,
"action": "code-fail",
@@ -1782,10 +1937,12 @@ def signin():
template_id=env_var("NOTIFY_SMS_AUTH_CODE_TEMPLATE"),
personalisation={
"sms_auth_code": pretty_code,
- "display_name": user_attributes["display_name"]
- if "display_name" in user_attributes
- and user_attributes["display_name"]
- else session["email"]["email"],
+ "display_name": (
+ user_attributes["display_name"]
+ if "display_name" in user_attributes
+ and user_attributes["display_name"]
+ else session["email"]["email"]
+ ),
},
)
except Exception as e:
@@ -1796,15 +1953,17 @@ def signin():
jprint(
{
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": c_ip,
"client_country": country,
"action": "sign-in-request-sms",
- "sms_number": user_attributes["sms_number"]
- if DEBUG
- else "REDACTED",
+ "sms_number": (
+ user_attributes["sms_number"] if DEBUG else "REDACTED"
+ ),
"pretty_code": pretty_code if DEBUG else "REDACTED",
"browser_cookie_value": browser_cookie_value,
}
@@ -1830,9 +1989,11 @@ def signin():
jprint(
{
"sub": session["sub"],
- "email": session["email"]["email"]
- if "email" in session and "email" in session["email"]
- else "",
+ "email": (
+ session["email"]["email"]
+ if "email" in session and "email" in session["email"]
+ else ""
+ ),
"client_ip": client_ip(),
"client_country": country,
"action": "sign-in-successful",
@@ -1887,9 +2048,9 @@ def return_sign_in(
"is_error": is_error,
"fail_message": fail_message or "Email address wasn't recognised",
"force_email": force_email,
- "email_callback": session.get("email", {}).get("email", None)
- if force_email
- else None,
+ "email_callback": (
+ session.get("email", {}).get("email", None) if force_email else None
+ ),
"code_fail": code_fail,
"title": code_type.title(),
"form_url": "/sign-in",
@@ -1898,7 +2059,7 @@ def return_sign_in(
}
client = sso_oidc.get_client(to_app)
- if client["ok"]:
+ if client["_ok"]:
if "name" in client:
page_params.update({"to_app_name": client["name"]})