diff --git a/moonraker/authorization.py b/moonraker/authorization.py deleted file mode 100644 index c8cd65ffd..000000000 --- a/moonraker/authorization.py +++ /dev/null @@ -1,932 +0,0 @@ -# API Key Based Authorization -# -# Copyright (C) 2020 Eric Callahan -# -# This file may be distributed under the terms of the GNU GPLv3 license - -from __future__ import annotations -import asyncio -import base64 -import uuid -import hashlib -import secrets -import os -import pyotp -import time -import datetime -import ipaddress -import re -import socket -import logging -from tornado.web import HTTPError -from libnacl.sign import Signer, Verifier -from ..utils import json_wrapper as jsonw -from ..common import RequestType, TransportType - -# Annotation imports -from typing import ( - TYPE_CHECKING, - Any, - Tuple, - Optional, - Union, - Dict, - List, -) - -if TYPE_CHECKING: - from ..confighelper import ConfigHelper - from ..common import WebRequest - from .websockets import WebsocketManager - from tornado.httputil import HTTPServerRequest - from .database import MoonrakerDatabase as DBComp - from .ldap import MoonrakerLDAP - IPAddr = Union[ipaddress.IPv4Address, ipaddress.IPv6Address] - IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] - OneshotToken = Tuple[IPAddr, Optional[Dict[str, Any]], asyncio.Handle] - -# Helpers for base64url encoding and decoding -def base64url_encode(data: bytes) -> bytes: - return base64.urlsafe_b64encode(data).rstrip(b"=") - -def base64url_decode(data: str) -> bytes: - pad_cnt = len(data) % 4 - if pad_cnt: - data += "=" * (4 - pad_cnt) - return base64.urlsafe_b64decode(data) - - -ONESHOT_TIMEOUT = 5 -TRUSTED_CONNECTION_TIMEOUT = 3600 -FQDN_CACHE_TIMEOUT = 84000 -PRUNE_CHECK_TIME = 300. - -AUTH_SOURCES = ["moonraker", "ldap"] -HASH_ITER = 100000 -API_USER = "_API_KEY_USER_" -TRUSTED_USER = "_TRUSTED_USER_" -RESERVED_USERS = [API_USER, TRUSTED_USER] -JWT_EXP_TIME = datetime.timedelta(hours=1) -JWT_HEADER = { - 'alg': "EdDSA", - 'typ': "JWT" -} - -class Authorization: - def __init__(self, config: ConfigHelper) -> None: - self.server = config.get_server() - self.login_timeout = config.getint('login_timeout', 90) - self.force_logins = config.getboolean('force_logins', False) - self.default_source = config.get('default_source', "moonraker").lower() - self.enable_api_key = config.getboolean('enable_api_key', True) - self.enable_totp = config.getboolean('enable_totp', False) - self.max_logins = config.getint("max_login_attempts", None, above=0) - self.failed_logins: Dict[IPAddr, int] = {} - self.fqdn_cache: Dict[IPAddr, Dict[str, Any]] = {} - if self.default_source not in AUTH_SOURCES: - self.server.add_warning( - "[authorization]: option 'default_source' - Invalid " - f"value '{self.default_source}', falling back to " - "'moonraker'." - ) - self.default_source = "moonraker" - self.ldap: Optional[MoonrakerLDAP] = None - if config.has_section("ldap"): - self.ldap = self.server.load_component(config, "ldap", None) - if self.default_source == "ldap" and self.ldap is None: - self.server.add_warning( - "[authorization]: Option 'default_source' set to 'ldap'," - " however [ldap] section failed to load or not configured" - ) - database: DBComp = self.server.lookup_component('database') - database.register_local_namespace('authorized_users', forbidden=True) - self.user_db = database.wrap_namespace('authorized_users') - self.users: Dict[str, Dict[str, Any]] = self.user_db.as_dict() - api_user: Optional[Dict[str, Any]] = self.users.get(API_USER, None) - if api_user is None: - self.api_key = uuid.uuid4().hex - self.users[API_USER] = { - 'username': API_USER, - 'api_key': self.api_key, - 'created_on': time.time() - } - else: - self.api_key = api_user['api_key'] - if (self.enable_totp): - database.register_local_namespace( - 'user_totp_secret_storage', - forbidden=True) - self.totp_secret_db = database.wrap_namespace('user_totp_secret_storage') - self.totp_secrets: Dict[str, Dict[str, Union[str, bool]]] = ( - self.totp_secret_db.as_dict() - ) - hi = self.server.get_host_info() - self.issuer = f"http://{hi['hostname']}:{hi['port']}" - self.public_jwks: Dict[str, Dict[str, Any]] = {} - for username, user_info in list(self.users.items()): - if username == API_USER: - # Validate the API User - for item in ["username", "api_key", "created_on"]: - if item not in user_info: - self.users[API_USER] = { - 'username': API_USER, - 'api_key': self.api_key, - 'created_on': time.time() - } - break - continue - else: - # validate created users - valid = True - for item in ["username", "password", "salt", "created_on"]: - if item not in user_info: - logging.info( - f"Authorization: User {username} does not " - f"contain field {item}, removing") - del self.users[username] - valid = False - break - if not valid: - continue - # generate jwks for valid users - if 'jwt_secret' in user_info: - try: - priv_key = self._load_private_key(user_info['jwt_secret']) - jwk_id = user_info['jwk_id'] - except (self.server.error, KeyError): - logging.info("Invalid key found for user, removing") - user_info.pop('jwt_secret', None) - user_info.pop('jwk_id', None) - self.users[username] = user_info - continue - self.public_jwks[jwk_id] = self._generate_public_jwk(priv_key) - # sync user changes to the database - self.user_db.sync(self.users) - self.trusted_users: Dict[IPAddr, Any] = {} - self.oneshot_tokens: Dict[str, OneshotToken] = {} - - # Get allowed cors domains - self.cors_domains: List[str] = [] - for domain in config.getlist('cors_domains', []): - bad_match = re.search(r"^.+\.[^:]*\*", domain) - if bad_match is not None: - self.server.add_warning( - f"[authorization]: Unsafe domain '{domain}' in option " - f"'cors_domains'. Wildcards are not permitted in the" - " top level domain." - ) - continue - if domain.endswith("/"): - self.server.add_warning( - f"[authorization]: Invalid domain '{domain}' in option " - "'cors_domains'. Domain's cannot contain a trailing " - "slash." - ) - else: - self.cors_domains.append( - domain.replace(".", "\\.").replace("*", ".*")) - - # Get Trusted Clients - self.trusted_ips: List[IPAddr] = [] - self.trusted_ranges: List[IPNetwork] = [] - self.trusted_domains: List[str] = [] - for val in config.getlist('trusted_clients', []): - # Check IP address - try: - tc = ipaddress.ip_address(val) - except ValueError: - pass - else: - self.trusted_ips.append(tc) - continue - # Check ip network - try: - tn = ipaddress.ip_network(val) - except ValueError as e: - if "has host bits set" in str(e): - self.server.add_warning( - f"[authorization]: Invalid CIDR expression '{val}' " - "in option 'trusted_clients'") - continue - pass - else: - self.trusted_ranges.append(tn) - continue - # Check hostname - match = re.match(r"([a-z0-9]+(-[a-z0-9]+)*\.?)+[a-z]{2,}$", val) - if match is not None: - self.trusted_domains.append(val.lower()) - else: - self.server.add_warning( - f"[authorization]: Invalid domain name '{val}' " - "in option 'trusted_clients'") - - t_clients = "\n".join( - [str(ip) for ip in self.trusted_ips] + - [str(rng) for rng in self.trusted_ranges] + - self.trusted_domains) - c_domains = "\n".join(self.cors_domains) - - logging.info( - f"Authorization Configuration Loaded\n" - f"Trusted Clients:\n{t_clients}\n" - f"CORS Domains:\n{c_domains}") - - eventloop = self.server.get_event_loop() - self.prune_timer = eventloop.register_timer( - self._prune_conn_handler) - - # Register Authorization Endpoints - self.server.register_endpoint( - "/access/login", RequestType.POST, self._handle_login, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - self.server.register_endpoint( - "/access/logout", RequestType.POST, self._handle_logout, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/refresh_jwt", RequestType.POST, self._handle_refresh_jwt, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - self.server.register_endpoint( - "/access/user", RequestType.all(), self._handle_user_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/users/list", RequestType.GET, self._handle_list_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/user/password", RequestType.POST, self._handle_password_reset, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/api_key", RequestType.GET | RequestType.POST, - self._handle_apikey_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/oneshot_token", RequestType.GET, self._handle_oneshot_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/info", RequestType.GET, self._handle_info_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - # Generate TOTP record - if (self.enable_totp): - self.server.register_endpoint( - "/access/get_totp_uri", RequestType.GET, self._handle_getTOTP_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - wsm: WebsocketManager = self.server.lookup_component("websockets") - wsm.register_notification("authorization:user_created") - wsm.register_notification( - "authorization:user_deleted", event_type="logout" - ) - wsm.register_notification( - "authorization:user_logged_out", event_type="logout" - ) - - def _sync_user(self, username: str) -> None: - self.user_db[username] = self.users[username] - - async def component_init(self) -> None: - self.prune_timer.start(delay=PRUNE_CHECK_TIME) - - async def _handle_apikey_request(self, web_request: WebRequest) -> str: - if web_request.get_request_type() == RequestType.POST: - self.api_key = uuid.uuid4().hex - self.users[API_USER]['api_key'] = self.api_key - self._sync_user(API_USER) - return self.api_key - - async def _handle_oneshot_request(self, web_request: WebRequest) -> str: - ip = web_request.get_ip_address() - assert ip is not None - user_info = web_request.get_current_user() - return self.get_oneshot_token(ip, user_info) - - async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]: - ip = web_request.get_ip_address() - if ip is not None and self.check_logins_maxed(ip): - raise HTTPError( - 401, "Unauthorized, Maximum Login Attempts Reached" - ) - try: - ret = await self._login_jwt_user(web_request) - except asyncio.CancelledError: - raise - except Exception: - if ip is not None: - failed = self.failed_logins.get(ip, 0) - self.failed_logins[ip] = failed + 1 - raise - if ip is not None: - self.failed_logins.pop(ip, None) - return ret - - async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]: - user_info = web_request.get_current_user() - if user_info is None: - raise self.server.error("No user logged in") - username: str = user_info['username'] - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid log out request for user {username}") - self.users[username].pop("jwt_secret", None) - jwk_id: str = self.users[username].pop("jwk_id", None) - self._sync_user(username) - self.public_jwks.pop(jwk_id, None) - eventloop = self.server.get_event_loop() - eventloop.delay_callback( - .005, self.server.send_event, "authorization:user_logged_out", - {'username': username} - ) - return { - "username": username, - "action": "user_logged_out" - } - - async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: - sources = ["moonraker"] - if self.ldap is not None: - sources.append("ldap") - login_req = self.force_logins and len(self.users) > 1 - request_trusted: Optional[bool] = None - user = web_request.current_user - req_ip = web_request.ip_addr - if user is not None and user.get("username") == TRUSTED_USER: - request_trusted = True - elif req_ip is not None: - request_trusted = await self._check_authorized_ip(req_ip) - return { - "default_source": self.default_source, - "available_sources": sources, - "login_required": login_req, - "trusted": request_trusted - } - - async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: - username: str = web_request.get_str('username') - (secret, is_activated) = self.totp_secrets.get(username, ('', True)) - if secret == '': - raise ValueError("User does not have a TOTP key set up.") - uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") - return { - "TOTP_URI": uri, - } - - async def _handle_refresh_jwt(self, - web_request: WebRequest - ) -> Dict[str, str]: - refresh_token: str = web_request.get_str('refresh_token') - try: - user_info = self.decode_jwt(refresh_token, token_type="refresh") - except Exception: - raise self.server.error("Invalid Refresh Token", 401) - username: str = user_info['username'] - if 'jwt_secret' not in user_info or "jwk_id" not in user_info: - raise self.server.error("User not logged in", 401) - private_key = self._load_private_key(user_info['jwt_secret']) - jwk_id: str = user_info['jwk_id'] - token = self._generate_jwt(username, jwk_id, private_key) - return { - 'username': username, - 'token': token, - 'source': user_info.get("source", "moonraker"), - 'action': 'user_jwt_refresh' - } - - async def _handle_user_request( - self, web_request: WebRequest - ) -> Dict[str, Any]: - req_type = web_request.get_request_type() - if req_type == RequestType.GET: - user = web_request.get_current_user() - if user is None: - return { - 'username': None, - 'source': None, - 'created_on': None, - } - else: - return { - 'username': user['username'], - 'source': user.get("source", "moonraker"), - 'created_on': user.get('created_on') - } - elif req_type == RequestType.POST: - # Create User - return await self._login_jwt_user(web_request, create=True) - elif req_type == RequestType.DELETE: - # Delete User - return self._delete_jwt_user(web_request) - raise self.server.error("Invalid Request Method") - - async def _handle_list_request(self, - web_request: WebRequest - ) -> Dict[str, List[Dict[str, Any]]]: - user_list = [] - for user in self.users.values(): - if user['username'] == API_USER: - continue - user_list.append({ - 'username': user['username'], - 'source': user.get("source", "moonraker"), - 'created_on': user['created_on'] - }) - return { - 'users': user_list - } - - async def _handle_password_reset(self, - web_request: WebRequest - ) -> Dict[str, str]: - password: str = web_request.get_str('password') - new_pass: str = web_request.get_str('new_password') - user_info = web_request.get_current_user() - if user_info is None: - raise self.server.error("No Current User") - username = user_info['username'] - if user_info.get("source", "moonraker") == "ldap": - raise self.server.error( - f"CanĀ“t Reset password for ldap user {username}") - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid Reset Request for user {username}") - salt = bytes.fromhex(user_info['salt']) - hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', password.encode(), salt, HASH_ITER).hex() - if hashed_pass != user_info['password']: - raise self.server.error("Invalid Password") - new_hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', new_pass.encode(), salt, HASH_ITER).hex() - self.users[username]['password'] = new_hashed_pass - self._sync_user(username) - if (self.enable_totp): - self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} - self.totp_secret_db.sync(self.totp_secrets) - return { - 'username': username, - 'action': "user_password_reset" - } - - async def _login_jwt_user( - self, web_request: WebRequest, create: bool = False - ) -> Dict[str, Any]: - username: str = web_request.get_str('username') - password: str = web_request.get_str('password') - source: str = web_request.get_str( - 'source', self.default_source - ).lower() - if source not in AUTH_SOURCES: - raise self.server.error(f"Invalid 'source': {source}") - user_info: Dict[str, Any] - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid Request for user {username}") - if source == "ldap": - if create: - raise self.server.error("Cannot Create LDAP User") - if self.ldap is None: - raise self.server.error( - "LDAP authentication not available", 401 - ) - await self.ldap.authenticate_ldap_user(username, password) - if username not in self.users: - create = True - if (self.enable_totp): - totp_code: str = web_request.get_str('totp_code') - if create: - if username in self.users: - raise self.server.error(f"User {username} already exists") - salt = secrets.token_bytes(32) - hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', password.encode(), salt, HASH_ITER).hex() - user_info = { - 'username': username, - 'password': hashed_pass, - 'salt': salt.hex(), - 'source': source, - 'created_on': time.time() - } - self.users[username] = user_info - self._sync_user(username) - action = "user_created" - if source == "ldap": - # Dont notify user created - action = "user_logged_in" - create = False - if (self.enable_totp): - self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} - self.totp_secret_db.sync(self.totp_secrets) - else: - if username not in self.users: - raise self.server.error(f"Unregistered User: {username}") - user_info = self.users[username] - auth_src = user_info.get("source", "moonraker") - if auth_src != source: - raise self.server.error( - f"Moonraker cannot authenticate user '{username}', must " - f"specify source '{auth_src}'", 401 - ) - salt = bytes.fromhex(user_info['salt']) - hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', password.encode(), salt, HASH_ITER).hex() - action = "user_logged_in" - if hashed_pass != user_info['password']: - raise self.server.error("Invalid Password") - if (self.enable_totp): - user_data_totp = self.totp_secrets.get(username, {'secret': '', 'is_activated': True}) - secret = user_data_totp['secret'] - is_activated = user_data_totp['is_activated'] - if secret == '': - raise self.server.error("User does not have a secret key set up.") - if pyotp.TOTP(secret).verify(totp_code) is False: - raise self.server.error("Invalid TOTP code") - if is_activated is False: - self.totp_secrets[username] = {'secret': secret, 'is_activated': True} - self.totp_secret_db.sync(self.totp_secrets) - jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) - if jwt_secret_hex is None: - private_key = Signer() - jwk_id = base64url_encode(secrets.token_bytes()).decode() - user_info['jwt_secret'] = private_key.hex_seed().decode() - user_info['jwk_id'] = jwk_id - self.users[username] = user_info - self._sync_user(username) - self.public_jwks[jwk_id] = self._generate_public_jwk(private_key) - else: - private_key = self._load_private_key(jwt_secret_hex) - jwk_id = user_info['jwk_id'] - token = self._generate_jwt(username, jwk_id, private_key) - refresh_token = self._generate_jwt( - username, jwk_id, private_key, token_type="refresh", - exp_time=datetime.timedelta(days=self.login_timeout)) - conn = web_request.get_client_connection() - if create: - event_loop = self.server.get_event_loop() - event_loop.delay_callback( - .005, self.server.send_event, - "authorization:user_created", - {'username': username}) - elif conn is not None: - conn.user_info = user_info - return { - 'username': username, - 'token': token, - 'source': user_info.get("source", "moonraker"), - 'refresh_token': refresh_token, - 'action': action - } - - def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]: - username: str = web_request.get_str('username') - current_user = web_request.get_current_user() - if current_user is not None: - curname = current_user.get('username', None) - if curname is not None and curname == username: - raise self.server.error( - f"Cannot delete logged in user {curname}") - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid Request for reserved user {username}") - user_info: Optional[Dict[str, Any]] = self.users.get(username) - if user_info is None: - raise self.server.error(f"No registered user: {username}") - if 'jwk_id' in user_info: - self.public_jwks.pop(user_info['jwk_id'], None) - del self.users[username] - del self.user_db[username] - event_loop = self.server.get_event_loop() - event_loop.delay_callback( - .005, self.server.send_event, - "authorization:user_deleted", - {'username': username}) - if (self.enable_totp): - del self.totp_secrets[username] - self.totp_secret_db.sync(self.totp_secrets) - return { - "username": username, - "action": "user_deleted" - } - - def _generate_jwt(self, - username: str, - jwk_id: str, - private_key: Signer, - token_type: str = "access", - exp_time: datetime.timedelta = JWT_EXP_TIME - ) -> str: - curtime = int(time.time()) - payload = { - 'iss': self.issuer, - 'aud': "Moonraker", - 'iat': curtime, - 'exp': curtime + int(exp_time.total_seconds()), - 'username': username, - 'token_type': token_type - } - header = {'kid': jwk_id} - header.update(JWT_HEADER) - jwt_header = base64url_encode(jsonw.dumps(header)) - jwt_payload = base64url_encode(jsonw.dumps(payload)) - jwt_msg = b".".join([jwt_header, jwt_payload]) - sig = private_key.signature(jwt_msg) - jwt_sig = base64url_encode(sig) - return b".".join([jwt_msg, jwt_sig]).decode() - - def decode_jwt( - self, token: str, token_type: str = "access", check_exp: bool = True - ) -> Dict[str, Any]: - message, sig = token.rsplit('.', maxsplit=1) - enc_header, enc_payload = message.split('.') - header: Dict[str, Any] = jsonw.loads(base64url_decode(enc_header)) - sig_bytes = base64url_decode(sig) - - # verify header - if header.get('typ') != "JWT" or header.get('alg') != "EdDSA": - raise self.server.error("Invalid JWT header") - jwk_id = header.get('kid') - if jwk_id not in self.public_jwks: - raise self.server.error("Invalid key ID") - - # validate signature - public_key = self._public_key_from_jwk(self.public_jwks[jwk_id]) - public_key.verify(sig_bytes + message.encode()) - - # validate claims - payload: Dict[str, Any] = jsonw.loads(base64url_decode(enc_payload)) - if payload['token_type'] != token_type: - raise self.server.error( - f"JWT Token type mismatch: Expected {token_type}, " - f"Recd: {payload['token_type']}", 401) - if payload['iss'] != self.issuer: - raise self.server.error("Invalid JWT Issuer", 401) - if payload['aud'] != "Moonraker": - raise self.server.error("Invalid JWT Audience", 401) - if check_exp and payload['exp'] < int(time.time()): - raise self.server.error("JWT Expired", 401) - - # get user - user_info: Optional[Dict[str, Any]] = self.users.get( - payload.get('username', ""), None) - if user_info is None: - raise self.server.error("Unknown user", 401) - return user_info - - def validate_jwt(self, token: str) -> Dict[str, Any]: - try: - user_info = self.decode_jwt(token) - except Exception as e: - if isinstance(e, self.server.error): - raise - raise self.server.error( - f"Failed to decode JWT: {e}", 401 - ) from e - return user_info - - def validate_api_key(self, api_key: str) -> Dict[str, Any]: - if not self.enable_api_key: - raise self.server.error("API Key authentication is disabled", 401) - if api_key and api_key == self.api_key: - return self.users[API_USER] - raise self.server.error("Invalid API Key", 401) - - def _load_private_key(self, secret: str) -> Signer: - try: - key = Signer(bytes.fromhex(secret)) - except Exception: - raise self.server.error( - "Error decoding private key, user data may" - " be corrupt", 500) from None - return key - - def _generate_public_jwk(self, private_key: Signer) -> Dict[str, Any]: - public_key = private_key.vk - return { - 'x': base64url_encode(public_key).decode(), - 'kty': "OKP", - 'crv': "Ed25519", - 'use': "sig" - } - - def _public_key_from_jwk(self, jwk: Dict[str, Any]) -> Verifier: - if jwk.get('kty') != "OKP": - raise self.server.error("Not an Octet Key Pair") - if jwk.get('crv') != "Ed25519": - raise self.server.error("Invalid Curve") - if 'x' not in jwk: - raise self.server.error("No 'x' argument in jwk") - key = base64url_decode(jwk['x']) - return Verifier(key.hex().encode()) - - def _prune_conn_handler(self, eventtime: float) -> float: - cur_time = time.time() - for ip, user_info in list(self.trusted_users.items()): - exp_time: float = user_info['expires_at'] - if cur_time >= exp_time: - self.trusted_users.pop(ip, None) - logging.info(f"Trusted Connection Expired, IP: {ip}") - for ip, fqdn_info in list(self.fqdn_cache.items()): - exp_time = fqdn_info["expires_at"] - if cur_time >= exp_time: - domain: str = fqdn_info["domain"] - self.fqdn_cache.pop(ip, None) - logging.info(f"Cached FQDN Expired, IP: {ip}, domain: {domain}") - return eventtime + PRUNE_CHECK_TIME - - def _oneshot_token_expire_handler(self, token): - self.oneshot_tokens.pop(token, None) - - def get_oneshot_token(self, - ip_addr: IPAddr, - user: Optional[Dict[str, Any]] - ) -> str: - token = base64.b32encode(os.urandom(20)).decode() - event_loop = self.server.get_event_loop() - hdl = event_loop.delay_callback( - ONESHOT_TIMEOUT, self._oneshot_token_expire_handler, token) - self.oneshot_tokens[token] = (ip_addr, user, hdl) - return token - - def _check_json_web_token( - self, request: HTTPServerRequest, required: bool = True - ) -> Optional[Dict[str, Any]]: - auth_token: Optional[str] = request.headers.get("Authorization") - if auth_token is None: - auth_token = request.headers.get("X-Access-Token") - if auth_token is None: - qtoken = request.query_arguments.get('access_token', None) - if qtoken is not None: - auth_token = qtoken[-1].decode(errors="ignore") - elif auth_token.startswith("Bearer "): - auth_token = auth_token[7:] - else: - return None - if auth_token: - try: - return self.decode_jwt(auth_token, check_exp=required) - except Exception: - logging.exception(f"JWT Decode Error {auth_token}") - raise HTTPError(401, "JWT Decode Error") - return None - - async def _check_authorized_ip(self, ip: IPAddr) -> bool: - if ip in self.trusted_ips: - return True - for rng in self.trusted_ranges: - if ip in rng: - return True - if self.trusted_domains: - if ip in self.fqdn_cache: - fqdn: str = self.fqdn_cache[ip]["domain"] - else: - eventloop = self.server.get_event_loop() - try: - fut = eventloop.run_in_thread(socket.getfqdn, str(ip)) - fqdn = await asyncio.wait_for(fut, 5.0) - except asyncio.TimeoutError: - logging.info("Call to socket.getfqdn() timed out") - return False - else: - fqdn = fqdn.lower() - self.fqdn_cache[ip] = { - "expires_at": time.time() + FQDN_CACHE_TIMEOUT, - "domain": fqdn - } - return fqdn in self.trusted_domains - return False - - async def _check_trusted_connection( - self, ip: Optional[IPAddr] - ) -> Optional[Dict[str, Any]]: - if ip is not None: - curtime = time.time() - exp_time = curtime + TRUSTED_CONNECTION_TIMEOUT - if ip in self.trusted_users: - self.trusted_users[ip]['expires_at'] = exp_time - return self.trusted_users[ip] - elif await self._check_authorized_ip(ip): - logging.info( - f"Trusted Connection Detected, IP: {ip}") - self.trusted_users[ip] = { - 'username': TRUSTED_USER, - 'password': None, - 'created_on': curtime, - 'expires_at': exp_time - } - return self.trusted_users[ip] - return None - - def _check_oneshot_token(self, - token: str, - cur_ip: Optional[IPAddr] - ) -> Optional[Dict[str, Any]]: - if token in self.oneshot_tokens: - ip_addr, user, hdl = self.oneshot_tokens.pop(token) - hdl.cancel() - if cur_ip != ip_addr: - logging.info(f"Oneshot Token IP Mismatch: expected{ip_addr}" - f", Recd: {cur_ip}") - return None - return user - else: - return None - - def check_logins_maxed(self, ip_addr: IPAddr) -> bool: - if self.max_logins is None: - return False - return self.failed_logins.get(ip_addr, 0) >= self.max_logins - - async def authenticate_request( - self, request: HTTPServerRequest, auth_required: bool = True - ) -> Optional[Dict[str, Any]]: - if request.method == "OPTIONS": - return None - - # Check JSON Web Token - jwt_user = self._check_json_web_token(request, auth_required) - if jwt_user is not None: - return jwt_user - - try: - ip = ipaddress.ip_address(request.remote_ip) # type: ignore - except ValueError: - logging.exception( - f"Unable to Create IP Address {request.remote_ip}") - ip = None - - # Check oneshot access token - ost: Optional[List[bytes]] = request.arguments.get('token', None) - if ost is not None: - ost_user = self._check_oneshot_token(ost[-1].decode(), ip) - if ost_user is not None: - return ost_user - - # Check API Key Header - if self.enable_api_key: - key: Optional[str] = request.headers.get("X-Api-Key") - if key and key == self.api_key: - return self.users[API_USER] - - # If the force_logins option is enabled and at least one user is created - # then trusted user authentication is disabled - if self.force_logins and len(self.users) > 1: - if not auth_required: - return None - raise HTTPError(401, "Unauthorized, Force Logins Enabled") - - # Check if IP is trusted. If this endpoint doesn't require authentication - # then it is acceptable to return None - trusted_user = await self._check_trusted_connection(ip) - if trusted_user is not None or not auth_required: - return trusted_user - - raise HTTPError(401, "Unauthorized") - - async def check_cors(self, origin: Optional[str]) -> bool: - if origin is None or not self.cors_domains: - return False - for regex in self.cors_domains: - match = re.match(regex, origin) - if match is not None: - if match.group() == origin: - logging.debug(f"CORS Pattern Matched, origin: {origin} " - f" | pattern: {regex}") - return True - else: - logging.debug(f"Partial Cors Match: {match.group()}") - else: - # Check to see if the origin contains an IP that matches a - # current trusted connection - match = re.search(r"^https?://([^/:]+)", origin) - if match is not None: - ip = match.group(1) - try: - ipaddr = ipaddress.ip_address(ip) - except ValueError: - pass - else: - if await self._check_authorized_ip(ipaddr): - logging.debug(f"Cors request matched trusted IP: {ip}") - return True - logging.debug(f"No CORS match for origin: {origin}\n" - f"Patterns: {self.cors_domains}") - return False - - def cors_enabled(self) -> bool: - return self.cors_domains is not None - - def close(self) -> None: - self.prune_timer.stop() - - -def load_component(config: ConfigHelper) -> Authorization: - return Authorization(config)