From 05ac0771dd5bcf924286a57e1baa90f83a540c85 Mon Sep 17 00:00:00 2001 From: Mathieu Tarral Date: Thu, 30 Apr 2020 06:34:36 +0200 Subject: [PATCH] security: add type checking --- hooks/security.py | 69 ++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/hooks/security.py b/hooks/security.py index 8ca70bc..cb59f2e 100644 --- a/hooks/security.py +++ b/hooks/security.py @@ -3,14 +3,17 @@ from collections import Counter from dataclasses import dataclass from pathlib import Path +from re import Match from tempfile import NamedTemporaryFile +from typing import List from checksec.elf import ELFSecurity, PIEType, RelroType, set_libc from checksec.errors import ErrorNotAnElf, ErrorParsingFailed +from guestfs import GuestFS from see import Hook from hooks.filesystem import Inode -from oswatcher.model import OSType +from oswatcher.model import OS, OSType @dataclass @@ -44,20 +47,20 @@ class SecurityHook(Hook): def __init__(self, parameters): super().__init__(parameters) self.os_info = None - self.stats = Counter() + self.stats: Counter = Counter() self.stats['total'] = 0 self.local_guest_libc = NamedTemporaryFile() - self.neo4j_enabled = self.configuration.get('neo4j', False) + self.neo4j_enabled: bool = self.configuration.get('neo4j', False) if self.neo4j_enabled: - self.os_node = self.configuration['neo4j']['OS'] - self.keep_binaries = self.configuration.get('keep_failed_binaries', False) + self.os_node: OS = self.configuration['neo4j']['OS'] + self.keep_binaries: bool = self.configuration.get('keep_failed_binaries', False) # directory to dump executable on which checksec failed if self.neo4j_enabled: os_id = self.os_node.id else: os_id = self.context.domain.name() - default_checksec_failed_dir = Path.cwd() / f"{os_id}_checksec_failed" - self.keep_binaries_dir = self.configuration.get('keep_failed_dir', default_checksec_failed_dir) + default_checksec_failed_dir: Path = Path.cwd() / f"{os_id}_checksec_failed" + self.keep_binaries_dir: Path = Path(self.configuration.get('keep_failed_dir', default_checksec_failed_dir)) self.context.subscribe('detected_os_info', self.get_os_info) self.context.subscribe('filesystem_capture_begin', self.download_libc) @@ -68,7 +71,7 @@ def get_os_info(self, event): def download_libc(self, event): """Locate and download the libc""" - gfs = event.gfs + gfs: GuestFS = event.gfs if not self.os_info: raise RuntimeError('Expected OS Info') @@ -77,35 +80,35 @@ def download_libc(self, event): return # find ldd - cmd = ['which', 'ldd'] + cmd: List = ['which', 'ldd'] try: - ldd_path = gfs.command(cmd).strip() + ldd_path: str = gfs.command(cmd).strip() except RuntimeError: self.logger.warning("Libc detection: command %s failed", cmd) return # find ls - cmd = ['which', 'ls'] + cmd: List = ['which', 'ls'] try: - ls_path = gfs.command(cmd).strip() + ls_path: str = gfs.command(cmd).strip() except RuntimeError: self.logger.warning("Libc detection: command %s failed", cmd) return - cmd = [ldd_path, ls_path] + cmd: List = [ldd_path, ls_path] try: - ldd_output = gfs.command(cmd).strip() + ldd_output: str = gfs.command(cmd).strip() except RuntimeError: self.logger.warning("Libc detection: command %s failed", cmd) return libc_inode = None for ldd_line in ldd_output.splitlines(): - m = re.match(r'\t*(?P.*)\s+(=>)?\s+(?P\S+)?\s+\((?P.*)\)$', ldd_line) + m: Match = re.match(r'\t*(?P.*)\s+(=>)?\s+(?P\S+)?\s+\((?P.*)\)$', ldd_line) if not m: self.logger.warn("Libc detection: line \"%s\" doesn't match LDD regex", ldd_line) continue if m.group('libname').startswith('libc.so'): # found guest libc - libc_inode = Inode(self.logger, gfs, Path(m.group('libpath'))) + libc_inode: Inode = Inode(self.logger, gfs, Path(m.group('libpath'))) break if libc_inode is None: self.logger.warning("Libc detection: Couldn't locate libc !") @@ -118,21 +121,21 @@ def download_libc(self, event): def check_file(self, event): # event args - inode = event.inode + inode: Inode = event.inode if not self.os_info['os_type'] == OSType.Linux: # checksec only supports ELF files return - mime = inode.file_magic_type - filepath = inode.path + mime: str = inode.file_magic_type + filepath: Path = inode.path if re.match(r'application/x(-pie)?-(executable|sharedlib)', mime): self.logger.info('Checking security of %s: %s', filepath, mime) self.stats['total'] += 1 # this is a heavy call (download the file on the host filesystem through libguestfs appliance) # call it here once we filtered on the mime type provided by the file utility - local_filepath = inode.local_file + local_filepath: Path = inode.local_file try: - elf = ELFSecurity(local_filepath) + elf: ELFSecurity = ELFSecurity(local_filepath) except ErrorNotAnElf: self.stats['failed'] += 1 self.logger.warning("Not a valid ELF file: %s (%s)", filepath, inode.gfs_file) @@ -148,18 +151,18 @@ def check_file(self, event): shutil.copy(inode.local_file, dst) return else: - relro = elf.relro - canary = elf.has_canary - nx = elf.has_nx - pie = elf.pie - rpath = elf.has_rpath - runpath = elf.has_runpath - symbols = not elf.is_stripped - fortified = elf.is_fortified - fortify_source = len(elf.fortified) - fortifyable = len(elf.fortifiable) - - checksec_file = ChecksecFile(relro, canary, nx, pie, rpath, runpath, + relro: RelroType = elf.relro + canary: bool = elf.has_canary + nx: bool = elf.has_nx + pie: PIEType = elf.pie + rpath: bool = elf.has_rpath + runpath: bool = elf.has_runpath + symbols: bool = not elf.is_stripped + fortified: bool = elf.is_fortified + fortify_source: int = len(elf.fortified) + fortifyable: int = len(elf.fortifiable) + + checksec_file: ChecksecFile = ChecksecFile(relro, canary, nx, pie, rpath, runpath, symbols, fortify_source, fortified, fortifyable) self.logger.debug("Properties: %s", checksec_file) self.context.trigger('security_checksec_bin', inode=inode, checksec_file=checksec_file)