Skip to content

Commit

Permalink
security: add type checking
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenzel committed Apr 30, 2020
1 parent f3af44a commit 05ac077
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions hooks/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from re import Match
from tempfile import NamedTemporaryFile
from typing import List

from checksec.elf import ELFSecurity, PIEType, RelroType, set_libc
from checksec.errors import ErrorNotAnElf, ErrorParsingFailed
from guestfs import GuestFS
from see import Hook

from hooks.filesystem import Inode
from oswatcher.model import OSType
from oswatcher.model import OS, OSType


@dataclass
Expand Down Expand Up @@ -44,20 +47,20 @@ class SecurityHook(Hook):
def __init__(self, parameters):
super().__init__(parameters)
self.os_info = None
self.stats = Counter()
self.stats: Counter = Counter()
self.stats['total'] = 0
self.local_guest_libc = NamedTemporaryFile()
self.neo4j_enabled = self.configuration.get('neo4j', False)
self.neo4j_enabled: bool = self.configuration.get('neo4j', False)
if self.neo4j_enabled:
self.os_node = self.configuration['neo4j']['OS']
self.keep_binaries = self.configuration.get('keep_failed_binaries', False)
self.os_node: OS = self.configuration['neo4j']['OS']
self.keep_binaries: bool = self.configuration.get('keep_failed_binaries', False)
# directory to dump executable on which checksec failed
if self.neo4j_enabled:
os_id = self.os_node.id
else:
os_id = self.context.domain.name()
default_checksec_failed_dir = Path.cwd() / f"{os_id}_checksec_failed"
self.keep_binaries_dir = self.configuration.get('keep_failed_dir', default_checksec_failed_dir)
default_checksec_failed_dir: Path = Path.cwd() / f"{os_id}_checksec_failed"
self.keep_binaries_dir: Path = Path(self.configuration.get('keep_failed_dir', default_checksec_failed_dir))

self.context.subscribe('detected_os_info', self.get_os_info)
self.context.subscribe('filesystem_capture_begin', self.download_libc)
Expand All @@ -68,7 +71,7 @@ def get_os_info(self, event):

def download_libc(self, event):
"""Locate and download the libc"""
gfs = event.gfs
gfs: GuestFS = event.gfs

if not self.os_info:
raise RuntimeError('Expected OS Info')
Expand All @@ -77,35 +80,35 @@ def download_libc(self, event):
return

# find ldd
cmd = ['which', 'ldd']
cmd: List = ['which', 'ldd']
try:
ldd_path = gfs.command(cmd).strip()
ldd_path: str = gfs.command(cmd).strip()
except RuntimeError:
self.logger.warning("Libc detection: command %s failed", cmd)
return
# find ls
cmd = ['which', 'ls']
cmd: List = ['which', 'ls']
try:
ls_path = gfs.command(cmd).strip()
ls_path: str = gfs.command(cmd).strip()
except RuntimeError:
self.logger.warning("Libc detection: command %s failed", cmd)
return
cmd = [ldd_path, ls_path]
cmd: List = [ldd_path, ls_path]
try:
ldd_output = gfs.command(cmd).strip()
ldd_output: str = gfs.command(cmd).strip()
except RuntimeError:
self.logger.warning("Libc detection: command %s failed", cmd)
return

libc_inode = None
for ldd_line in ldd_output.splitlines():
m = re.match(r'\t*(?P<libname>.*)\s+(=>)?\s+(?P<libpath>\S+)?\s+\((?P<addr>.*)\)$', ldd_line)
m: Match = re.match(r'\t*(?P<libname>.*)\s+(=>)?\s+(?P<libpath>\S+)?\s+\((?P<addr>.*)\)$', ldd_line)
if not m:
self.logger.warn("Libc detection: line \"%s\" doesn't match LDD regex", ldd_line)
continue
if m.group('libname').startswith('libc.so'):
# found guest libc
libc_inode = Inode(self.logger, gfs, Path(m.group('libpath')))
libc_inode: Inode = Inode(self.logger, gfs, Path(m.group('libpath')))
break
if libc_inode is None:
self.logger.warning("Libc detection: Couldn't locate libc !")
Expand All @@ -118,21 +121,21 @@ def download_libc(self, event):

def check_file(self, event):
# event args
inode = event.inode
inode: Inode = event.inode

if not self.os_info['os_type'] == OSType.Linux:
# checksec only supports ELF files
return
mime = inode.file_magic_type
filepath = inode.path
mime: str = inode.file_magic_type
filepath: Path = inode.path
if re.match(r'application/x(-pie)?-(executable|sharedlib)', mime):
self.logger.info('Checking security of %s: %s', filepath, mime)
self.stats['total'] += 1
# this is a heavy call (download the file on the host filesystem through libguestfs appliance)
# call it here once we filtered on the mime type provided by the file utility
local_filepath = inode.local_file
local_filepath: Path = inode.local_file
try:
elf = ELFSecurity(local_filepath)
elf: ELFSecurity = ELFSecurity(local_filepath)
except ErrorNotAnElf:
self.stats['failed'] += 1
self.logger.warning("Not a valid ELF file: %s (%s)", filepath, inode.gfs_file)
Expand All @@ -148,18 +151,18 @@ def check_file(self, event):
shutil.copy(inode.local_file, dst)
return
else:
relro = elf.relro
canary = elf.has_canary
nx = elf.has_nx
pie = elf.pie
rpath = elf.has_rpath
runpath = elf.has_runpath
symbols = not elf.is_stripped
fortified = elf.is_fortified
fortify_source = len(elf.fortified)
fortifyable = len(elf.fortifiable)

checksec_file = ChecksecFile(relro, canary, nx, pie, rpath, runpath,
relro: RelroType = elf.relro
canary: bool = elf.has_canary
nx: bool = elf.has_nx
pie: PIEType = elf.pie
rpath: bool = elf.has_rpath
runpath: bool = elf.has_runpath
symbols: bool = not elf.is_stripped
fortified: bool = elf.is_fortified
fortify_source: int = len(elf.fortified)
fortifyable: int = len(elf.fortifiable)

checksec_file: ChecksecFile = ChecksecFile(relro, canary, nx, pie, rpath, runpath,
symbols, fortify_source, fortified, fortifyable)
self.logger.debug("Properties: %s", checksec_file)
self.context.trigger('security_checksec_bin', inode=inode, checksec_file=checksec_file)
Expand Down

0 comments on commit 05ac077

Please sign in to comment.