Skip to content
This repository has been archived by the owner on Jul 5, 2023. It is now read-only.

Commit

Permalink
Make use of python-nmap (#109)
Browse files Browse the repository at this point in the history
* Make use of python-nmap

* Add test for nmap result parse

* Trick nmap wrapper to thing we have nmap installed
  • Loading branch information
johscheuer authored Aug 12, 2020
1 parent 77ed156 commit 9deef02
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 81 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ jobs:
python: "3.8"
before_script:
- pip install codecov
- export PATH="${PATH}:$(pwd)"
script:
- coverage run setup.py test --addopts="-m 'not e2e' --runslow"
- coverage run --source=./src/illuminatio/ setup.py test --addopts="-m 'not e2e' --runslow"
after_success:
- codecov

Expand Down
9 changes: 9 additions & 0 deletions nmap
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash
# This script is used to trick travis (actually the python-nmap lib)
# to think that nmap is istalled (which is a requirement).

echo 'Nmap version 7.80 ( https://nmap.org )
Platform: x86_64-apple-darwin19.5.0
Compiled with: nmap-liblua-5.3.5 openssl-1.1.1g nmap-libssh2-1.8.2 libz-1.2.11 nmap-libpcre-7.6 libpcap-1.9.1 nmap-libdnet-1.12 ipv6
Compiled without:
Available nsock engines: kqueue poll select'
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ setuptools>=41.0.1
coverage==4.5.3
pytest-cov==2.7.1
termcolor==1.1.0
python-nmap==0.6.1
148 changes: 68 additions & 80 deletions src/illuminatio/illuminatio_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
This file contains the implementation of the illuminatio runner which
actively executes network policy tests inside the kubernetes cluster itself
"""
import ipaddress
import json
import logging
import os
import socket
import subprocess
import tempfile
import time
from xml.etree import ElementTree
import platform
import yaml
import nmap

import click
import click_log
import docker
import kubernetes as k8s
from nsenter import Namespace

from illuminatio.host import Host, ConcreteClusterHost
from illuminatio.k8s_util import create_test_output_config_map_manifest

# Otherwise we get an error on Mac
if platform.system() == "Linux":
from nsenter import Namespace

LOGGER = logging.getLogger(__name__)
click_log.basic_config(LOGGER)
CASE_FILE_PATH = "/etc/config/cases.yaml"
Expand All @@ -43,7 +47,7 @@ def build_result_string(port, target, should_be_blocked, was_blocked):
port,
("not " if should_be_blocked else ""),
)
return "%s\n%s" % (title, details)
return f"{title}\n{details}"


@click.command()
Expand Down Expand Up @@ -119,6 +123,25 @@ def get_pods_contained_in_both_lists(sender_pods, pods_on_node):
return sender_pods_on_node


def pod_list_contains_pod(pod, pod_list):
"""
Checks whether a list of pods contains a given pod
"""
LOGGER.debug("Searching for pod %s", pod)
if isinstance(pod, ConcreteClusterHost):
is_on_node = any([pod == pod_on_node for pod_on_node in pod_list])
if is_on_node:
LOGGER.debug("Pod %s in namespace %s was found", pod.name, pod.namespace)
else:
LOGGER.debug(
"Pod %s in namespace %s isn't on this node", pod.name, pod.namespace
)
return is_on_node

LOGGER.error("Found non-ConcreteClusterHost host in cases: %s", pod)
return False


def run_tests_for_sender_pod(sender_pod, cases):
"""
Runs test cases from the network namespace of a given pod.
Expand All @@ -144,101 +167,66 @@ def run_tests_for_target(network_ns, ports, target):
LOGGER.info("Target: %s", target)
port_on_nums = {port.replace("-", ""): port for port in ports}
port_string = ",".join(port_on_nums.keys())
# ToDo do we really need this -> we know the service already
# ToDo do we really need this -> we know the service already and could use the cluster IP
# DNS could be blocked
# resolve target ip
# Only IPv4 currently
# ToDo try catch -- > socket.gaierror: [Errno -2] Name or service not known
# Only IPv4 currently -> https://docs.python.org/3/library/socket.html#socket.getaddrinfo
svc_dns_entry = get_domain_name_for(target)
LOGGER.info(svc_dns_entry)
svc_ip = socket.gethostbyname(svc_dns_entry)
LOGGER.info("Service IP: %s for Service: %s", svc_ip, target)
with tempfile.NamedTemporaryFile() as result_file:
LOGGER.debug("Results will be stored to %s", result_file)
# remove the need for nmap!
# e.g. https://gist.github.com/betrcode/0248f0fda894013382d7
# nmap that target TODO: handle None ip
# Replace bare nmap call with a better integrated solution like: https://pypi.org/project/python-nmap/ ?
nmap_cmd = ["nmap", "-oX", result_file.name, "-Pn", "-p", port_string, svc_ip]
LOGGER.info("running nmap with cmd %s", nmap_cmd)
prc = None
with Namespace(network_ns, "net"):
prc = subprocess.run(
nmap_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if prc is None or prc.returncode:
LOGGER.error("Executing nmap in foreign net ns failed! output:")
LOGGER.error(prc.stderr)
LOGGER.debug(prc)
return {
port_string: {
"success": False,
"error": "Couldn't nmap host %s with hostname %s"
% (target, svc_dns_entry),
}
}
try:
svc_ip = socket.gethostbyname(svc_dns_entry)
except (socket.gaierror, socket.herror) as error:
return {port_string: {"success": False, "error": error}}

LOGGER.info("finished running nmap")
LOGGER.debug("Error log: %s", prc.stderr)
# when not using shell the output from nmap contains \\n instead of newline characters
LOGGER.info("Stdout: %s", str(prc.stdout).split("\\n"))
return extract_results_from_nmap_xml_file(result_file, port_on_nums, target)
LOGGER.info("Service IP: %s for Service: %s", svc_ip, target)

ipv6_arg = ""
if ipaddress.ip_address(svc_ip).version == 6:
ipv6_arg = "-6"

def pod_list_contains_pod(pod, pod_list):
"""
Checks whether a list of pods contains a given pod
"""
LOGGER.debug("Searching for pod %s", pod)
if isinstance(pod, ConcreteClusterHost):
is_on_node = any([pod == pod_on_node for pod_on_node in pod_list])
if is_on_node:
LOGGER.debug("Pod %s in namespace %s was found", pod.name, pod.namespace)
else:
LOGGER.debug(
"Pod %s in namespace %s isn't on this node", pod.name, pod.namespace
)
return is_on_node
nm_scanner = nmap.PortScanner()
with Namespace(network_ns, "net"):
nm_scanner.scan(svc_ip, arguments=f"-n -Pn -p {port_string} {ipv6_arg}")
LOGGER.info("Ran nmap with cmd %s", nm_scanner.command_line())

LOGGER.error("Found non-ConcreteClusterHost host in cases: %s", pod)
return False
return extract_results_from_nmap(nm_scanner, port_on_nums, target)


def extract_results_from_nmap_xml_file(result_file, port_on_nums, target):
def extract_results_from_nmap(nmap_res, port_on_nums, target):
"""
Extracts the results of an nmap scan from an xml result file into a dictionary
Extracts the results of an nmap scan into a dictionary
"""
xml = ElementTree.parse(result_file.name)
hosts = [h for h in xml.getroot().iter("host")]
hosts = nmap_res.all_hosts()
if len(hosts) != 1:
LOGGER.error(
"Fund %s a single host in nmap results but expected only one target to be probed",
len(hosts),
)
port_string = ",".join(port_on_nums.keys())
return {
port_string: {
"success": False,
"error": "Found %s hosts in nmap results, expected 1."
% str(len(hosts)),
"error": f"Found {len(hosts)} hosts in nmap results, expected 1.",
}
}
host_element = hosts[0]
host_names = [hn.get("name") for hn in host_element.iter("hostname")]
LOGGER.debug("Found names %s for target %s", host_names, target)

results = {}
for port_element in host_element.iter("port"):
port = port_element.get("portid")
state = port_element.find("state").get("state")
port_with_expectation = port_on_nums[port]
should_be_blocked = "-" in port_with_expectation
was_blocked = state == "filtered"
results[port_with_expectation] = {}
results[port_with_expectation]["success"] = should_be_blocked == was_blocked
results[port_with_expectation]["string"] = build_result_string(
port, target, should_be_blocked, was_blocked
)
results[port_with_expectation]["nmap-state"] = state

host = hosts[0]
for proto in nmap_res[host].all_protocols():
# get all scanned ports for all protocols
for port in nmap_res[host][proto].keys():
if proto == "tcp":
# We use the direct tcp method here for better mocking
state = nmap_res[host].tcp(port)["state"]
else:
state = nmap_res[host][proto][port]["state"]
port_with_expectation = port_on_nums[str(port)]
should_be_blocked = "-" in port_with_expectation
was_blocked = state == "filtered"
results[port_with_expectation] = {
"success": should_be_blocked == was_blocked,
"string": build_result_string(
port, target, should_be_blocked, was_blocked
),
"nmap-state": state,
}

return results


Expand Down
132 changes: 132 additions & 0 deletions tests/test_illuminatio_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pytest
import nmap
from unittest.mock import MagicMock
from illuminatio.illuminatio_runner import (
build_result_string,
extract_results_from_nmap,
)


@pytest.mark.parametrize(
"test_input,expected",
[
(
{
"port": "80",
"target": "test",
"should_be_blocked": False,
"was_blocked": False,
},
"Test test:80 succeeded\nCould reach test on port 80. Expected target to be reachable",
),
(
{
"port": "80",
"target": "test",
"should_be_blocked": False,
"was_blocked": True,
},
"Test test:80 failed\nCouldn't reach test on port 80. Expected target to be reachable",
),
(
{
"port": "80",
"target": "test",
"should_be_blocked": True,
"was_blocked": False,
},
"Test test:-80 failed\nCould reach test on port 80. Expected target to not be reachable",
),
(
{
"port": "80",
"target": "test",
"should_be_blocked": True,
"was_blocked": True,
},
"Test test:-80 succeeded\nCouldn't reach test on port 80. Expected target to not be reachable",
),
],
)
def test_build_result_string(test_input, expected):
assert build_result_string(**test_input) == expected


def create_nmap_mock(hosts: list()):
nmap_mock = nmap.PortScanner()
nmap_mock.all_hosts = MagicMock(return_value=hosts)
nmap_mock._scan_result = MagicMock(return_value={"scan"})
if len(hosts) > 0:
nmap_mock[hosts[0]].all_protocols = MagicMock(return_value=["tcp"])
nmap_mock[hosts[0]]["tcp"].keys = MagicMock(return_value=[80])
nmap_mock[hosts[0]].tcp = MagicMock(
return_value={"state": "open", "reason": "syn-ack", "name": "http"}
)

return nmap_mock


@pytest.mark.parametrize(
"test_input,expected",
[
(
{"nmap_res": create_nmap_mock([]), "port_on_nums": {}, "target": "test"},
{
"": {
"error": "Found 0 hosts in nmap results, expected 1.",
"success": False,
}
},
),
(
{
"nmap_res": create_nmap_mock(["123.321.123.321"]),
"port_on_nums": {"80": "80"},
"target": "test",
},
{
"80": {
"nmap-state": "open",
"string": "Test test:80 succeeded\n"
"Could reach test on port 80. Expected target to be "
"reachable",
"success": True,
}
},
),
(
{
"nmap_res": create_nmap_mock(["123.321.123.321"]),
"port_on_nums": {"80": "-80"},
"target": "test",
},
{
"-80": {
"nmap-state": "open",
"string": "Test test:-80 failed\n"
"Could reach test on port 80. Expected target to not be "
"reachable",
"success": False,
}
},
),
(
{
"nmap_res": create_nmap_mock(["::1"]),
"port_on_nums": {"80": "-80"},
"target": "test",
},
{
"-80": {
"nmap-state": "open",
"string": "Test test:-80 failed\n"
"Could reach test on port 80. Expected target to not be "
"reachable",
"success": False,
}
},
),
],
)
def test_extract_results_from_nmap(test_input, expected):
assert extract_results_from_nmap(**test_input) == expected

0 comments on commit 9deef02

Please sign in to comment.