Skip to content

Commit

Permalink
Use local os storage to store master key and encrypt saved password w…
Browse files Browse the repository at this point in the history
…ith it before saving to pgadmin db instead of one entry for each saved passoword.
  • Loading branch information
yogeshmahajan-1903 committed Aug 12, 2024
1 parent 1a143c9 commit 85fd138
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 469 deletions.
2 changes: 1 addition & 1 deletion web/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@
# Applicable for desktop mode only
##########################################################################
MASTER_PASSWORD_REQUIRED = True

USE_OS_SECRET_STORAGE = True
##########################################################################

# pgAdmin encrypts the database connection and ssh tunnel password using a
Expand Down
242 changes: 107 additions & 135 deletions web/pgadmin/browser/__init__.py

Large diffs are not rendered by default.

316 changes: 61 additions & 255 deletions web/pgadmin/browser/server_groups/servers/__init__.py

Large diffs are not rendered by default.

289 changes: 286 additions & 3 deletions web/pgadmin/browser/server_groups/servers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@

"""Server helper utilities"""
from ipaddress import ip_address
import keyring
from flask_login import current_user
from werkzeug.exceptions import InternalServerError
from flask import render_template

from pgadmin.utils.constants import KEY_RING_USERNAME_FORMAT, \
KEY_RING_SERVICE_NAME, KEY_RING_USER_NAME, KEY_RING_TUNNEL_FORMAT
from pgadmin.utils.crypto import encrypt, decrypt
import config
from pgadmin.model import db, Server
from flask import current_app
from pgadmin.utils.exception import CryptKeyMissing
from pgadmin.utils.master_password import validate_master_password, \
get_crypt_key, set_masterpass_check_text


def is_valid_ipaddress(address):
Expand Down Expand Up @@ -232,6 +239,211 @@ def _password_check(server, manager, old_key, new_key):
manager.password = password


def migrate_passwords_from_os_secret_storage(servers, enc_key):
"""
Migrate password stored in os secret storage
:param servers: server list
:param enc_key: new encryption key
:return: True if successful else False
"""
passwords_migrated = False
try:
if len(servers) > 0:
for server in servers:
server_name = KEY_RING_USERNAME_FORMAT.format(server.name,
server.id)
print('migrating for ' + server_name + 'from os storage')
server_password = keyring.get_password(
KEY_RING_SERVICE_NAME, server_name)
if server_password:
server_password = encrypt(server_password, enc_key)
setattr(server, 'password', server_password)
else:
setattr(server, 'save_password', 0)

tunnel_name = KEY_RING_TUNNEL_FORMAT.format(server.name,
server.id)
tunnel_password = keyring.get_password(
KEY_RING_SERVICE_NAME, tunnel_name)
if tunnel_password:
setattr(server, 'tunnel_password', tunnel_password)
keyring.delete_password(
KEY_RING_SERVICE_NAME, tunnel_name)
else:
setattr(server, 'tunnel_password', None)
passwords_migrated = True
return passwords_migrated
else:
# This means no server password to migrate
return passwords_migrated
except Exception as e:
current_app.logger.warning(
'Failed to migrate passwords stored using OS password manager.'
'Error: {0}'.format(e))
return False


def migrate_passwords_from_pgadmin_db(servers, old_key, enc_key):
"""
Migrates passwords stored in pgadmin db
:param servers: list of servers
:param old_key: old encryption key
:param enc_key: new encryption key
:return: True if successful else False
"""
try:
for ser in servers:
if ser.password:
password = decrypt(ser.password, old_key).decode()
server_password = encrypt(password, enc_key)
setattr(ser, 'password', server_password)

if ser.tunnel_password:
password = decrypt(ser.tunnel_password, old_key).decode()
tunnel_password = encrypt(password, enc_key)
setattr(ser, 'tunnel_password', tunnel_password)
return True
except Exception as e:
current_app.logger.warning(
'Failed to migrate passwords stored using master password or user'
' password password manager. Error: {0}'.format(e))
# TODO - Really required? - need a test case
config.USE_OS_SECRET_STORAGE = False
return False


def migrate_saved_passwords(master_key, master_password, old_crypt_key):
"""
Function will migrate password stored in pgadmin db and os secret storage
with separate entry for each server(initial keyring implementation #5123).
Now all saved passwords will be stored in pgadmin db which are encrypted
using master_key which is stored in local os storage.
:param master_key: encryption key from local os storage
:param master_password: set by user if MASTER_PASSWORD_REQUIRED=True
:param old_crypt_key: enc_key with ith passwords were encrypted when
MASTER_PASSWORD_REQUIRED=False
:return: True if all passwords are migrated successfully.
"""
if config.USE_OS_SECRET_STORAGE:
error = ''
crypt_key = None
passwords_migrated = False
if config.ALLOW_SAVE_PASSWORD or config.ALLOW_SAVE_TUNNEL_PASSWORD:
# Get servers with saved password
all_server = Server.query.all()
saved_password_servers = \
[ser for ser in all_server
if ser.save_password or ser.tunnel_password]

servers_with_pwd_in_os_secret = []
servers_with_pwd_in_pgadmin_db = []
for ser in saved_password_servers:
if ser.password is None:
servers_with_pwd_in_os_secret.append(ser)
else:
servers_with_pwd_in_pgadmin_db.append(ser)

# No server passwords are saved
if len(saved_password_servers) == 0:
current_app.logger.warning(
'There are no saved passwords')
return passwords_migrated, error

# Get master password if enabled
if config.MASTER_PASSWORD_REQUIRED:
# & received master_password is None &
# masterpass_check is present & there are saved passwords
if not master_password and \
current_user.masterpass_check is not None and\
len(servers_with_pwd_in_pgadmin_db) > 0:
error = "Master Password required"
return passwords_migrated, error
# master_password is received but incorrect
elif master_password and \
not validate_master_password(master_password):
error = "Incorrect master password"
return passwords_migrated, error
else:
# retrieve crypt key which is newly generated encryption key
# and stored in keyring
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
raise CryptKeyMissing()

# Try migration
try:
# server passwords stored in pgadmin db are present
# password are encrypted with master password or old_crypt_key
if len(servers_with_pwd_in_pgadmin_db) > 0:
# if master_password present and masterpass_check is
# present, server passwords are encrypted with
# master password
if master_password and \
current_user.masterpass_check is not None:
current_app.logger.warning(
'Re-encrypting passwords saved using '
'master password')
passwords_migrated = migrate_passwords_from_pgadmin_db(
servers_with_pwd_in_pgadmin_db,
master_password,
master_key)
# clear master_pass check once passwords are migrated
if passwords_migrated:
set_masterpass_check_text('', clear=True)
# if old_crypt_key is present & is different from crypt_key
# server passwords are encrypted wit old_crypt_key
else:
if old_crypt_key and crypt_key != old_crypt_key:
current_app.logger.warning(
'Re-encrypting passwords saved using'
' user password')
passwords_migrated = \
migrate_passwords_from_pgadmin_db(
servers_with_pwd_in_pgadmin_db,
old_crypt_key,
master_key)
# TODO - Really required?
# else:
# current_app.logger.warning(
# 'As master key was already present could not
# re-encrypt password encrypted with
# user password')

# servers passwords stored with os storage are present.
if len(servers_with_pwd_in_os_secret) > 0:
current_app.logger.warning(
'Re-encrypting passwords saved using '
'os password manager')
passwords_migrated = \
migrate_passwords_from_os_secret_storage(
servers_with_pwd_in_os_secret, master_key)

if passwords_migrated:
# commit the changes once all are migrated
db.session.commit()
# Add code to delete passwords from os password manager
if len(servers_with_pwd_in_os_secret) > 0:
delete_saved_passwords_from_os_secret_storage(
servers_with_pwd_in_os_secret)
# TODO - Really required?
# # Clear the masterpass_check to indicate migration is
# # completed for servers encrypted with master password
# if current_user.masterpass_check is not None:
# set_masterpass_check_text('', clear=True)
# Update driver manager with new passwords
update_session_manager(saved_password_servers)
current_app.logger.warning(
'Password migration is successful')
return passwords_migrated, error
except Exception as e:
current_app.logger.warning(
'Failed to migrate passwords stored using Master password,'
' user password, OS secret storage. Fallback to older'
' mechanism used. Error: {0}'.format(e))
config.USE_OS_SECRET_STORAGE = False
return passwords_migrated, error


def reencrpyt_server_passwords(user_id, old_key, new_key):
"""
This function will decrypt the saved passwords in SQLite with old key
Expand All @@ -242,7 +454,6 @@ def reencrpyt_server_passwords(user_id, old_key, new_key):

for server in Server.query.filter_by(user_id=user_id).all():
manager = driver.connection_manager(server.id)

_password_check(server, manager, old_key, new_key)

if server.tunnel_password is not None:
Expand Down Expand Up @@ -274,13 +485,85 @@ def remove_saved_passwords(user_id):
try:
db.session.query(Server) \
.filter(Server.user_id == user_id) \
.update({Server.password: None, Server.tunnel_password: None})
.update({Server.password: None, Server.tunnel_password: None,
Server.save_password: 0})
db.session.commit()
except Exception:
db.session.rollback()
raise


def delete_saved_passwords_from_os_secret_storage(servers):
"""
Delete passwords from os secret storage
:param servers: server list
:return: True if successful else False
"""
try:
if len(servers) > 0:
for server in servers:
server_name = KEY_RING_USERNAME_FORMAT.format(server.name,
server.id)
print('Deleting pwd from os storage for ' + server_name)
server_password = keyring.get_password(
KEY_RING_SERVICE_NAME, server_name)
if server_password:
print('Not actually deleting server_password from os')
# keyring.delete_password(
# KEY_RING_SERVICE_NAME, server_name)
else:
setattr(server, 'save_password', 0)

tunnel_name = KEY_RING_TUNNEL_FORMAT.format(server.name,
server.id)
tunnel_password = keyring.get_password(
KEY_RING_SERVICE_NAME, tunnel_name)
if tunnel_password:
print('Not actually deleting tunnel_password from os')
# keyring.delete_password(
# KEY_RING_SERVICE_NAME, tunnel_name)
else:
setattr(server, 'tunnel_password', None)
return True
else:
# This means no server password to migrate
return False
except Exception as e:
current_app.logger.warning(
'Failed to delete passwords stored in OS password manager.'
'Error: {0}'.format(e))
return False


def update_session_manager(user_id=None, servers=None):
"""
Updates the passwords in the session
:param user_id:
:param servers:
:return:
"""
from pgadmin.model import Server
from pgadmin.utils.driver import get_driver
driver = get_driver(config.PG_DEFAULT_DRIVER)
try:
if user_id:
for server in Server.query.\
filter_by(user_id=current_user.id).all():
manager = driver.connection_manager(server.id)
manager.update(server)
elif servers:
for server in servers:
manager = driver.connection_manager(server.id)
manager.update(server)
else:
return False
db.session.commit()
return True
except Exception:
db.session.rollback()
raise


def get_replication_type(conn, sversion):
status, res = conn.execute_dict(render_template(
"/servers/sql/#{0}#/replication_type.sql".format(sversion)
Expand Down
1 change: 1 addition & 0 deletions web/pgadmin/browser/tests/test_master_password.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MasterPasswordTestCase(BaseTestGenerator):
def setUp(self):
config.MASTER_PASSWORD_REQUIRED = True
config.AUTHENTICATION_SOURCES = [INTERNAL]
config.USE_OS_SECRET_STORAGE = False

def runTest(self):
"""This function will check change password functionality."""
Expand Down
5 changes: 2 additions & 3 deletions web/pgadmin/evaluate_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,14 @@ def evaluate_and_patch_config(config: dict) -> dict:
config['ENABLE_PSQL'] = True

if config.get('SERVER_MODE'):
config.setdefault('DISABLED_LOCAL_PASSWORD_STORAGE', True)
config.setdefault('USE_OS_SECRET_STORAGE', False)
config.setdefault('KEYRING_NAME', '')
else:
k_name = keyring.get_keyring().name
if k_name == 'fail Keyring':
config.setdefault('DISABLED_LOCAL_PASSWORD_STORAGE', True)
config.setdefault('USE_OS_SECRET_STORAGE', False)
config.setdefault('KEYRING_NAME', '')
else:
config.setdefault('DISABLED_LOCAL_PASSWORD_STORAGE', False)
config.setdefault('KEYRING_NAME', k_name)

config.setdefault('SESSION_COOKIE_PATH', config.get('COOKIE_DEFAULT_PATH'))
Expand Down
2 changes: 1 addition & 1 deletion web/pgadmin/static/js/Dialogs/index.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export function checkMasterPassword(data, masterpass_callback_queue, cancel_call

if(isKeyring) {
pgAdmin.Browser.notifier.alert(gettext('Migration successful'),
gettext(`Passwords previously saved by pgAdmin have been successfully migrated to ${res.data.data.keyring_name} and removed from the pgAdmin store.`));
gettext(`Passwords previously saved by pgadmin have been successfully encrypted using ${res.data.data.keyring_name}.`));
}
}
}).catch(function(error) {
Expand Down
1 change: 1 addition & 0 deletions web/pgadmin/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@


KEY_RING_SERVICE_NAME = 'pgAdmin4'
KEY_RING_USER_NAME = 'pgadmin4'
KEY_RING_USERNAME_FORMAT = KEY_RING_SERVICE_NAME + '-{0}-{1}'
KEY_RING_TUNNEL_FORMAT = KEY_RING_SERVICE_NAME + '-tunnel-{0}-{1}'
KEY_RING_DESKTOP_USER = KEY_RING_SERVICE_NAME + '-desktop-user-{0}'
Expand Down
Loading

0 comments on commit 85fd138

Please sign in to comment.