Skip to content

Commit

Permalink
Adding tests for udsclient. Will ensure code is compatible with pytho…
Browse files Browse the repository at this point in the history
…n 3.8 and greater, and that works fine
  • Loading branch information
dkmstr committed Mar 4, 2024
1 parent 936505b commit 9e2691f
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 70 deletions.
Binary file added .coverage
Binary file not shown.
17 changes: 17 additions & 0 deletions coverage.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[run]
dynamic_context = test_function
omit =
*/shibokensupport/*
*/signature_bootstrap.py
branch = True

[report]
skip_empty = True
exclude_lines =
pragma: no cover
raise NotImplementedError
if typing.TYPE_CHECKING:

[html]
show_contexts = True
title = UDS Client Test Coverage Report
51 changes: 30 additions & 21 deletions src/UDSClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import typing

from uds.ui import QtCore, QtWidgets, QtGui, QSettings, Ui_MainWindow # pyright: ignore
from uds.rest import RestApi, RetryException, InvalidVersion
from uds.rest import RestApi

# Just to ensure there are available on runtime
from uds.tunnel import forward as tunnel_forwards # pyright: ignore[reportUnusedImport]
Expand Down Expand Up @@ -138,7 +138,7 @@ def stop_animation(self) -> None:
def fetch_version(self) -> None:
try:
self.api.get_version()
except InvalidVersion as e:
except exceptions.InvalidVersion as e:
QtWidgets.QMessageBox.critical(
self,
'Upgrade required',
Expand Down Expand Up @@ -174,7 +174,7 @@ def fetch_transport_data(self) -> None:
# Execute the waiting tasks...
threading.Thread(target=end_script).start()

except RetryException as e:
except exceptions.RetryException as e:
self.ui.info.setText(str(e) + ', retrying access...')
# Retry operation in ten seconds
QtCore.QTimer.singleShot(10000, self.fetch_transport_data)
Expand Down Expand Up @@ -251,7 +251,7 @@ def verify_host_approval(hostName: str) -> bool:
return approved


def ssl_error_processor(hostname: str, serial: str) -> bool:
def ssl_certificate_validator(hostname: str, serial: str) -> bool:
settings = QSettings()
settings.beginGroup('ssl')

Expand Down Expand Up @@ -281,7 +281,7 @@ def minimal(api: RestApi, ticket: str, scrambler: str) -> int:
logger.debug('Getting version')
try:
api.get_version()
except InvalidVersion as e:
except exceptions.InvalidVersion as e:
QtWidgets.QMessageBox.critical(
None, # type: ignore
'Upgrade required',
Expand All @@ -298,7 +298,7 @@ def minimal(api: RestApi, ticket: str, scrambler: str) -> int:
# Execute the waiting task...
threading.Thread(target=end_script).start()

except RetryException as e:
except exceptions.RetryException as e:
QtWidgets.QMessageBox.warning(
None, # type: ignore
'Service not ready',
Expand Down Expand Up @@ -337,31 +337,31 @@ def parse_arguments(args: typing.List[str]) -> typing.Tuple[str, str, str, bool]

use_minimal_interface = False
uds_url = args[1]

if uds_url == '--minimal':
use_minimal_interface = True
uds_url = args[2] # And get URI

if uds_url == '--test':
raise exceptions.UDSArgumentException('test')
raise exceptions.ArgumentException('test')

try:
urlinfo = urllib.parse.urlparse(uds_url)
ticket, scrambler = urlinfo.path.split('/')[1:3]
except Exception:
raise exceptions.UDSMessageException('Invalid UDS URL')
raise exceptions.MessageException('Invalid UDS URL')

# Check if minimal interface is requested on the URL
if 'minimal' in urllib.parse.parse_qs(urlinfo.query):
use_minimal_interface = True

if urlinfo.scheme == 'uds':
if not consts.DEBUG:
raise exceptions.UDSMessageException(
raise exceptions.MessageException(
'UDS Client Version {} does not support HTTP protocol Anymore.'.format(VERSION)
)
elif urlinfo.scheme != 'udss':
raise exceptions.UDSMessageException('Not supported protocol') # Just shows "about" dialog
raise exceptions.MessageException('Not supported protocol') # Just shows "about" dialog

return (
urlinfo.netloc,
Expand Down Expand Up @@ -397,7 +397,7 @@ def main(args: typing.List[str]) -> int:
# First parameter must be url
try:
host, ticket, scrambler, _use_minimal_interface = parse_arguments(args)
except exceptions.UDSMessageException as e:
except exceptions.MessageException as e:
logger.debug('Detected execution without valid URI, exiting: %s', e)
QtWidgets.QMessageBox.critical(
None, # type: Ignore
Expand All @@ -406,7 +406,7 @@ def main(args: typing.List[str]) -> int:
QtWidgets.QMessageBox.StandardButton.Ok,
)
return 1
except exceptions.UDSArgumentException as e:
except exceptions.ArgumentException as e:
# Currently only test, return 0
return 0
except Exception:
Expand All @@ -419,15 +419,18 @@ def main(args: typing.List[str]) -> int:
)
return 1

# Setup REST api endpoint
api = RestApi(f'https://{host}/uds/rest/client', ssl_error_processor)
# Setup REST api and ssl certificate validator
api = RestApi.api(
host,
on_invalid_certificate=ssl_certificate_validator,
)

try:
logger.debug('Starting execution')

# Approbe before going on
if verify_host_approval(host) is False:
raise Exception('Host {} was not approved'.format(host))
raise exceptions.MessageException('Host {} was not approved'.format(host))

win = UDSClient(api, ticket, scrambler)
win.show()
Expand All @@ -438,10 +441,16 @@ def main(args: typing.List[str]) -> int:
logger.debug('Main execution finished correctly: %s', exit_code)

except Exception as e:
logger.exception('Got an exception executing client:')
if not isinstance(e, exceptions.MessageException):
logger.exception('Got an exception executing client:')
else:
logger.info('Message from error: %s', e)
exit_code = 128
QtWidgets.QMessageBox.critical(
None, 'Error', f'Fatal error: {e}', QtWidgets.QMessageBox.StandardButton.Ok # type: ignore
None,
'Error',
f'Fatal error: {e}',
QtWidgets.QMessageBox.StandardButton.Ok,
)

logger.debug('Exiting')
Expand Down
20 changes: 18 additions & 2 deletions src/uds/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
class UDSException(Exception):
pass


class UDSMessageException(Exception):
class MessageException(UDSException):
pass

class UDSArgumentException(Exception):

class ArgumentException(UDSException):
pass


class RetryException(UDSException):
pass


class InvalidVersion(UDSException):
downloadUrl: str

def __init__(self, downloadUrl: str) -> None:
super().__init__(downloadUrl)
self.downloadUrl = downloadUrl
60 changes: 18 additions & 42 deletions src/uds/rest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017-2021 Virtual Cable S.L.U.
# Copyright (c) 2017-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
Expand Down Expand Up @@ -43,29 +43,14 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend

from . import consts, tools
from . import consts, tools, exceptions
from .log import logger

# Callback for error on cert
# parameters are hostname, serial
# If returns True, ignores error
CertCallbackType = typing.Callable[[str, str], bool]

# Exceptions
class UDSException(Exception):
pass


class RetryException(UDSException):
pass


class InvalidVersion(UDSException):
downloadUrl: str

def __init__(self, downloadUrl: str) -> None:
super().__init__(downloadUrl)
self.downloadUrl = downloadUrl

class RestApi:

Expand All @@ -84,25 +69,20 @@ def __init__(
self._on_invalid_certificate = on_invalid_certificate
self._server_version = ''

def get(
self, path: str, params: typing.Optional[typing.Mapping[str, str]] = None
) -> typing.Any:
def get(self, path: str, params: typing.Optional[typing.Mapping[str, str]] = None) -> typing.Any:
if params:
path += '?' + '&'.join(
'{}={}'.format(k, urllib.parse.quote(str(v).encode('utf8')))
for k, v in params.items()
'{}={}'.format(k, urllib.parse.quote(str(v).encode('utf8'))) for k, v in params.items()
)

return json.loads(
RestApi.get_url(self._rest_api_endpoint + path, self._on_invalid_certificate)
)
return json.loads(RestApi.get_url(self._rest_api_endpoint + path, self._on_invalid_certificate))

def process_error(self, data: typing.Any) -> None:
if 'error' in data:
if data.get('retryable', '0') == '1':
raise RetryException(data['error'])
raise exceptions.RetryException(data['error'])

raise UDSException(data['error'])
raise exceptions.UDSException(data['error'])

def get_version(self) -> str:
'''Gets and stores the serverVersion.
Expand All @@ -118,17 +98,15 @@ def get_version(self) -> str:

try:
if self._server_version > consts.VERSION:
raise InvalidVersion(downloadUrl)
raise exceptions.InvalidVersion(downloadUrl)

return self._server_version
except InvalidVersion:
except exceptions.InvalidVersion:
raise
except Exception as e:
raise UDSException(e)
raise exceptions.UDSException(e) from e

def get_script_and_parameters(
self, ticket: str, scrambler: str
) -> typing.Tuple[str, typing.Any]:
def get_script_and_parameters(self, ticket: str, scrambler: str) -> typing.Tuple[str, typing.Any]:
'''Gets the transport script, validates it if necesary
and returns it'''
try:
Expand Down Expand Up @@ -160,18 +138,14 @@ def get_script_and_parameters(
if tools.verify_signature(script, signature) is False:
logger.error('Signature is invalid')

raise Exception(
'Invalid UDS code signature. Please, report to administrator'
)
raise Exception('Invalid UDS code signature. Please, report to administrator')

return script.decode(), params

# exec(script.decode("utf-8"), globals(), {'parent': self, 'sp': params})

@staticmethod
def _open(
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
) -> typing.Any:
def _open(url: str, certErrorCallback: typing.Optional[CertCallbackType] = None) -> typing.Any:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
Expand Down Expand Up @@ -237,9 +211,11 @@ def _open_url(url: str) -> typing.Any:
return response

@staticmethod
def get_url(
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
) -> bytes:
def api(host: str, on_invalid_certificate: CertCallbackType) -> 'RestApi':
return RestApi(f'https://{host}/uds/rest/client', on_invalid_certificate)

@staticmethod
def get_url(url: str, certErrorCallback: typing.Optional[CertCallbackType] = None) -> bytes:
with RestApi._open(url, certErrorCallback) as response:
resp = response.read()

Expand Down
Empty file added tests/__init__.py
Empty file.
25 changes: 20 additions & 5 deletions tests/test_commandline.py → tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
from unittest import TestCase

import UDSClient
from uds import exceptions, consts
from uds import exceptions, consts, rest

from .utils import fixtures

logger = logging.getLogger(__name__)


class TestTunnel(TestCase):
class TestClient(TestCase):
def test_commandline(self):
def _check_url(url: str, minimal: typing.Optional[str] = None, with_minimal: bool = False) -> None:
host, ticket, scrambler, use_minimal = UDSClient.parse_arguments(
Expand All @@ -54,16 +56,16 @@ def _check_url(url: str, minimal: typing.Optional[str] = None, with_minimal: boo
UDSClient.parse_arguments(['udsclient'])

# Valid command line, but not an URI. should return UDSArgumentException
with self.assertRaises(exceptions.UDSArgumentException):
with self.assertRaises(exceptions.ArgumentException):
UDSClient.parse_arguments(['udsclient', '--test'])

# unkonwn protocol, should return UDSArgumentException
with self.assertRaises(exceptions.UDSMessageException):
with self.assertRaises(exceptions.MessageException):
UDSClient.parse_arguments(['udsclient', 'unknown://' + 'a' * 2048])

# uds protocol, but withoout debug mode, should rais exception.UDSMessagException
consts.DEBUG = False
with self.assertRaises(exceptions.UDSMessageException):
with self.assertRaises(exceptions.MessageException):
_check_url('uds://a/b/c')

# Set DEBUG mode (on consts), now should work
Expand All @@ -77,3 +79,16 @@ def _check_url(url: str, minimal: typing.Optional[str] = None, with_minimal: boo
_check_url('udss://a/b/c', '--minimal', with_minimal=True)
# No matter what is passed as value of minimal, if present, it will be used
_check_url('udss://a/b/c?minimal=11', with_minimal=True)

def test_rest(self):
# This is a simple test, we will test the rest api is mocked correctly
with fixtures.patch_rest_api() as api:
self.assertEqual(api.get_version(), fixtures.SERVER_VERSION)
self.assertEqual(api.get_script_and_parameters('ticket', 'scrambler'), (fixtures.SCRIPT, fixtures.PARAMETERS))

from_api = rest.RestApi.api('host', lambda x, y: True)
# Repeat tests, should return same results
self.assertEqual(from_api.get_version(), fixtures.SERVER_VERSION)
self.assertEqual(from_api.get_script_and_parameters('ticket', 'scrambler'), (fixtures.SCRIPT, fixtures.PARAMETERS))
# And also, the api is the same
self.assertEqual(from_api, api)
Empty file added tests/utils/__init__,py
Empty file.
Loading

0 comments on commit 9e2691f

Please sign in to comment.