From 1aed1efd7758c7269385c65127d37784019ca95c Mon Sep 17 00:00:00 2001 From: Marco Realacci Date: Tue, 14 May 2024 00:23:36 +0200 Subject: [PATCH] Add comments --- .github/workflows/build-push.yml | 3 ++ main.py | 69 +++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index 7e5dcc6..f5639dd 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -38,6 +38,9 @@ jobs: uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + # set latest tag for main branch + type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'main') }} # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. diff --git a/main.py b/main.py index 6c8870f..57f735f 100644 --- a/main.py +++ b/main.py @@ -5,18 +5,37 @@ import ipaddress import urllib3 +# Environment variables +# URL of the OPNsense instance OPNSENSE_URL = os.getenv("OPNSENSE_URL", None) +# API key for the OPNsense instance OPNSENSE_API_KEY = os.getenv("OPNSENSE_API_KEY", None) +# API secret for the OPNsense instance OPNSENSE_API_SECRET = os.getenv("OPNSENSE_API_SECRET", None) +# URL of the Technitium DNS server TECHNITIUM_URL = os.getenv("TECHNITIUM_URL", None) +# Token for the Technitium DNS server TECHNITIUM_TOKEN = os.getenv("TECHNITIUM_TOKEN", None) +# Subnets for the DNS zones DNS_ZONE_SUBNETS = os.getenv("DNS_ZONE_SUBNETS", None) +# Flag to indicate whether to do IPv4 or not DO_V4 = (os.getenv("DO_V4", "false").lower() == "true") +# Flag to indicate whether to verify HTTPS or not VERIFY_HTTPS = (os.getenv("VERIFY_HTTPS", "true").lower() == "true") +# Clock interval for the main loop CLOCK = int(os.getenv("CLOCK", "30")) def get_opnsense_data(path): + """ + Function to get data from the OPNsense API. + + Args: + path (str): The API endpoint to hit. + + Returns: + dict: The JSON response from the API call. + """ r = requests.get(url=OPNSENSE_URL + path, verify=VERIFY_HTTPS, auth=(OPNSENSE_API_KEY, OPNSENSE_API_SECRET)) if r.status_code != 200: logging.error("Error occurred" + str(r.status_code) + ": " + r.text) @@ -25,14 +44,36 @@ def get_opnsense_data(path): def get_ndp(): + """ + Function to get the NDP table from the OPNsense API. + + Returns: + dict: The JSON response from the API call. + """ return get_opnsense_data("/api/diagnostics/interface/search_ndp") def get_dhcp4_leases(): + """ + Function to get the DHCPv4 leases from the OPNsense API. + + Returns: + dict: The JSON response from the API call. + """ return get_opnsense_data("/api/dhcpv4/leases/searchLease") def build_matches(ndp, leases): + """ + Function to build matches between NDP and DHCPv4 leases. + + Args: + ndp (dict): The NDP table. + leases (dict): The DHCPv4 leases. + + Returns: + set: A set of matches. + """ matches = set() for e in leases["rows"]: ip6s = tuple(x["ip"].split("%")[0] for x in ndp["rows"] if x["mac"] == e["mac"]) @@ -43,12 +84,29 @@ def build_matches(ndp, leases): def find_zone(zones, ip4): + """ + Function to find the DNS zone for a given IPv4 address. + + Args: + zones (list): The list of DNS zones. + ip4 (str): The IPv4 address. + + Returns: + str: The DNS zone for the given IPv4 address. + """ for zone in zones: if ip4 in zone[0]: return zone[1] return None def make_record(zones, match): + """ + Function to make a DNS record for a given match. + + Args: + zones (list): The list of DNS zones. + match (tuple): The match to make a record for. + """ zone = find_zone(zones, ipaddress.ip_address(match[0])) if zone is None: logging.warning("Could not find a DNS zone for " + match[0]) @@ -79,6 +137,9 @@ def make_record(zones, match): def run(): + """ + Main function to run the script. + """ if not VERIFY_HTTPS: urllib3.disable_warnings() @@ -107,6 +168,12 @@ def run(): def verify_env() -> bool: + """ + Function to verify the environment variables. + + Returns: + bool: True if all mandatory environment variables are set, False otherwise. + """ if not OPNSENSE_URL: return False if not OPNSENSE_API_KEY: return False if not OPNSENSE_API_SECRET: return False @@ -128,4 +195,4 @@ def verify_env() -> bool: logging.info("OPNSENSE_URL: {}".format(OPNSENSE_URL)) logging.info("TECHNITIUM_URL: {}".format(TECHNITIUM_URL)) logging.info("VERIFY_HTTPS: {}".format(VERIFY_HTTPS)) - run() + run() \ No newline at end of file