Skip to content

Commit

Permalink
this should fail
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Sep 27, 2024
1 parent 9d89446 commit aff3351
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 55 deletions.
19 changes: 19 additions & 0 deletions tests/system_tests_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,25 @@ def test_ssl_client_profile_update_load(self):

clients = []
for test in range(10):

# Expect failure
bad_ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
bad_ssl_domain.set_trusted_ca_db(CA_CERT)
bad_ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT)
bad_ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD)
bad_conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': bad_ssl_domain}

with self.assertRaises(Exception) as exc:
test_tx = AsyncTestSender(f"amqps://localhost:{self.listener2_port}",
target="test/addr",
message=message,
container_id=f"BADTX{test}",
conn_args=bad_conn_args,
get_link_info=False)
test_tx.wait()

for c_index in range(4):
c_id = f"FooTx-{c_index}"
test_tx = AsyncTestSender(f"amqps://localhost:{self.listener2_port}",
Expand Down
124 changes: 69 additions & 55 deletions tests/system_tests_tcp_adaptor_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from system_test import CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD
from system_test import SERVER2_CERTIFICATE, SERVER2_PRIVATE_KEY, SERVER2_PRIVATE_KEY_PASSWORD
from system_test import SSL_PROFILE_TYPE
#from system_test import is_pattern_present
from system_test import is_pattern_present
from system_tests_ssl import RouterTestSslBase
from system_tests_tcp_adaptor import TcpAdaptorBase, CommonTcpTests, ncat_available
from http1_tests import wait_tcp_listeners_up
Expand Down Expand Up @@ -870,27 +870,23 @@ def test_ssl_profile_update(self):
#

skmgr_a = self.router_qdra.sk_manager
skmgr_b = self.router_qdrb.sk_manager

skmgr_a.update(SSL_PROFILE_TYPE, self.listener_profile_cfg, name='listener-ssl-profile')
skmgr_b.update(SSL_PROFILE_TYPE, self.connector_profile_cfg, name='connector-ssl-profile')

# with self.assertRaises(Exception) as emgr:
# skmgr_a.update(SSL_PROFILE_TYPE, {'password': 'badpassword'},
# name='listener-ssl-profile')
with self.assertRaises(Exception) as emgr:
skmgr_a.update(SSL_PROFILE_TYPE, {'password': 'badpassword'},
name='listener-ssl-profile')

# self.assertIn('Failed to set TLS certFile', str(emgr.exception))
self.assertIn('Failed to set TLS certFile', str(emgr.exception))

#
# Restore the proper password and verify clients can connect
#

# skmgr_a.update(SSL_PROFILE_TYPE, {'password':
# SERVER_PRIVATE_KEY_PASSWORD},
# name='listener-ssl-profile')
skmgr_a.update(SSL_PROFILE_TYPE, {'password':
SERVER_PRIVATE_KEY_PASSWORD},
name='listener-ssl-profile')

# out = skmgr_a.read(name='listener-ssl-profile')
# self.assertEqual(SERVER_PRIVATE_KEY_PASSWORD, out['password'])
out = skmgr_a.read(name='listener-ssl-profile')
self.assertEqual(SERVER_PRIVATE_KEY_PASSWORD, out['password'])

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
Expand All @@ -905,29 +901,29 @@ def test_ssl_profile_update(self):
# Now update the listener sslProfile with a valid config, but one that
# will not allow the client to connect
#
# new_cfg = {'caCertFile': CA2_CERT,
# 'certFile': SERVER2_CERTIFICATE,
# 'privateKeyFile': SERVER2_PRIVATE_KEY,
# 'password': SERVER2_PRIVATE_KEY_PASSWORD}
# skmgr_a.update(SSL_PROFILE_TYPE, new_cfg, name='listener-ssl-profile')

# out, error = self.opensslclient(port=self.router_listener_port,
# ssl_info=client_ssl_info,
# data=b"The CA will not allow this!" + payload,
# expect=Process.EXIT_FAIL,
# cl_args=self.s_client_args)
# self.router_qdra.wait_log_message(r'TLS connection failed')
new_cfg = {'caCertFile': CA2_CERT,
'certFile': SERVER2_CERTIFICATE,
'privateKeyFile': SERVER2_PRIVATE_KEY,
'password': SERVER2_PRIVATE_KEY_PASSWORD}
skmgr_a.update(SSL_PROFILE_TYPE, new_cfg, name='listener-ssl-profile')

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"The CA will not allow this!" + payload,
expect=Process.EXIT_FAIL,
cl_args=self.s_client_args)
self.router_qdra.wait_log_message(r'TLS connection failed')

#
# Update the client ssl_info to use a compatible client cert and verify
# all is well:
#

# client_ssl_info = dict()
# client_ssl_info['CA_CERT'] = CA2_CERT
# client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT2_CERTIFICATE
# client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT2_PRIVATE_KEY
# client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT2_PRIVATE_KEY_PASSWORD
client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA2_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT2_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT2_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT2_PRIVATE_KEY_PASSWORD
out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Hey we recovered!" + payload,
Expand All @@ -948,11 +944,11 @@ def test_ssl_profile_update(self):
#

openssl_server.teardown()
# server_ssl_info = dict()
# server_ssl_info['CA_CERT'] = CA2_CERT
# server_ssl_info['SERVER_CERTIFICATE'] = SERVER2_CERTIFICATE
# server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER2_PRIVATE_KEY
# server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER2_PRIVATE_KEY_PASSWORD
server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA2_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER2_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER2_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER2_PRIVATE_KEY_PASSWORD
openssl_server = server_create(listening_port=self.openssl_server_listening_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer2",
Expand All @@ -962,28 +958,21 @@ def test_ssl_profile_update(self):
ssl_info=client_ssl_info,
data=b"The server conn must fail" + payload,
cl_args=self.s_client_args)
self.assertIn(b"Verification: OK", out, f"{error}")
self.assertIn(b"Verify return code: 0 (ok)", out, f"{error}")
openssl_server.wait_out_message("The server conn must fail")

skmgr_a.update(SSL_PROFILE_TYPE, self.listener_profile_cfg, name='listener-ssl-profile')
skmgr_b.update(SSL_PROFILE_TYPE, self.connector_profile_cfg, name='connector-ssl-profile')

# self.router_qdrb.wait_log_message(r'TLS connection failed')
# with open(openssl_server.outfile_path, 'rt') as out_file:
# self.assertFalse(is_pattern_present(out_file,
# "The server conn must fail"),
# "TLS connection did not fail")
self.router_qdrb.wait_log_message(r'TLS connection failed')
with open(openssl_server.outfile_path, 'rt') as out_file:
self.assertFalse(is_pattern_present(out_file,
"The server conn must fail"),
"TLS connection did not fail")

# Now update the connectors sslProfile with a compatible client cert
# and verify a new connection succeeds

# new_cfg = {'caCertFile': CA2_CERT,
# 'certFile': CLIENT2_CERTIFICATE,
# 'privateKeyFile': CLIENT2_PRIVATE_KEY,
# 'password': CLIENT2_PRIVATE_KEY_PASSWORD}
# skmgr_b = self.router_qdrb.sk_manager
#skmgr_b.update(SSL_PROFILE_TYPE, new_cfg, name='connector-ssl-profile')
new_cfg = {'caCertFile': CA2_CERT,
'certFile': CLIENT2_CERTIFICATE,
'privateKeyFile': CLIENT2_PRIVATE_KEY,
'password': CLIENT2_PRIVATE_KEY_PASSWORD}
skmgr_b = self.router_qdrb.sk_manager
skmgr_b.update(SSL_PROFILE_TYPE, new_cfg, name='connector-ssl-profile')
out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"The server conn must succeed!" + payload,
Expand Down Expand Up @@ -1122,11 +1111,35 @@ def test_ssl_profile_update_load(self):
echo_server.logger.dump()
self.assertTrue(echo_server.is_running, "Echo Server failed to start")

# Negative test - expect failure

bad_ssl_info = {'CLIENT_CERTIFICATE': CLIENT2_CERTIFICATE,
'CLIENT_PRIVATE_KEY': CLIENT2_PRIVATE_KEY,
'CLIENT_PRIVATE_KEY_PASSWORD': CLIENT2_PRIVATE_KEY_PASSWORD,
'CA_CERT': CA2_CERT}

client_name = "SslProfileEchoClientBAD"
client_logger = Logger(title=client_name,
print_to_console=False,
ofilename=os.path.join(os.path.dirname(os.getcwd()),
f"{client_name}.log"))
with self.assertRaises(Exception) as exc:
bad_client = TcpEchoClient(prefix=client_name,
host='localhost',
port=self.router_listener_port,
size=5000,
count=10,
logger=client_logger,
ssl_info=bad_ssl_info)
bad_client.wait()

# now test multiple simultaineous clients while updating the sslProfile

clients = []

for test in range(10):
for c_index in range(4):
client_name = f"SslProfileEchoClient-{test}-{c_index}"
client_name = f"SslProfileEchoClient-{test}-{c_index}-A"
client_logger = Logger(title=client_name,
print_to_console=False,
ofilename=os.path.join(os.path.dirname(os.getcwd()),
Expand All @@ -1139,6 +1152,7 @@ def test_ssl_profile_update_load(self):
logger=client_logger,
ssl_info=client_ssl_info)
clients.append(echo_client)

skmgr_a.update(SSL_PROFILE_TYPE, self.listener_profile_cfg, name='listener-ssl-profile')
skmgr_b.update(SSL_PROFILE_TYPE, self.connector_profile_cfg, name='connector-ssl-profile')

Expand Down

0 comments on commit aff3351

Please sign in to comment.