Skip to content

Commit

Permalink
release 1.5
Browse files Browse the repository at this point in the history
- Little refactoring to improve usage as imported library
- Add threat info to alerts if present
- Updated dependencies
- Added support for pip > 10 build

Co-Authored-By: Arcuri Davide <[email protected]>
  • Loading branch information
garanews and dadokkio committed May 19, 2020
1 parent 317122c commit dd21bbd
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 57 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
*~
*.swp

# MyPy
.mypy_cache/

# PyCharm folder
/.idea/

# Log file
mans_to_es.log

# Generic auto-generated build files
*.pyc
*.pyo
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Changelog

## [1.5] 2020-05-19
- Little refactoring to improve usage as imported library
- Add threat info to alerts if present
- Aligned dependencies to Timesketch requirements
- Added support for pip > 20 build

## [1.4] 2019-10-03
- Support for extracting multiple field as comment
- Keep all meta by default
- Check if elastic is up @deralexxx

## [1.3] 2019-07-29
- Added process-api to processed items
- Skip if not explicitly selected
- Timestamp parsing improvment

## [1.0] 2019-07-24
- First working release
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pip install mans-to-es

If you want to develop with the script you can download and place it under /usr/local/bin and make it executable.

#### Usage
#### Usage as script

```
$ mans_to_es.py --help
Expand All @@ -53,6 +53,14 @@ optional arguments:
```

#### Usage as lib

```
>>> from mans_to_es import MansToEs
>>> a = MansToEs(filename = '<file.mans>', index="<index>", name="<name>", es_host="localhost", es_port=9200)
>>> a.run()
```

## Contributing

**If you want to contribute to mans_to_es, be sure to review the [contributing guidelines](CONTRIBUTING.md). This project adheres to mans_to_es
Expand Down
3 changes: 3 additions & 0 deletions mans_to_es/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from mans_to_es.mans_to_es import MansToEs

__all__ = [MansToEs]
127 changes: 88 additions & 39 deletions mans_to_es/mans_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
import datetime
import ciso8601

import xmltodict

import pandas as pd
import xmltodict # type: ignore
import pandas as pd # type: ignore

from multiprocessing import Pool, cpu_count

import elasticsearch
import elasticsearch # type: ignore
from elasticsearch import helpers, Elasticsearch

from typing import Tuple, Union, BinaryIO, TextIO, Dict, List, Mapping, Any

from glob import glob

# hide ES log
Expand All @@ -32,7 +33,7 @@
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(filename="mans_to_es.log", level=logging.DEBUG, format=FORMAT)

MANS_FIELDS = {
MANS_FIELDS: Dict[str, Any] = {
"persistence": {
"key": "PersistenceItem",
"datefields": [
Expand Down Expand Up @@ -163,7 +164,7 @@
}


def convert_both_pandas(argument, offset=0):
def convert_both_pandas(argument: str, offset=0) -> pd.Series:
"""
convert_both_pandas: parse date field and convert to it to proper
in:
Expand All @@ -182,7 +183,7 @@ def convert_both_pandas(argument, offset=0):
return pd.Series([None, None])


def convert_both(argument, offset=0):
def convert_both(argument: str, offset=0) -> Union[Tuple[str, str], Tuple[None, None]]:
"""
convert_both: parse date field and convert to it to proper
in:
Expand All @@ -199,7 +200,7 @@ def convert_both(argument, offset=0):
return None, None


def convert_skew(offset):
def convert_skew(offset: str) -> int:
"""
convert_skew: return offset for xml file in seconds
in:
Expand All @@ -215,33 +216,42 @@ def convert_skew(offset):


class MansToEs:
def __init__(self, args):
self.filename = args.filename
self.index = args.index
self.name = args.name
self.bulk_size = args.bulk_size
self.cpu_count = args.cpu_count
def __init__(
self,
filename: str,
index: str,
name: str,
es_host: str,
es_port: int,
bulk_size: int = 1000,
cpu_count: int = cpu_count() - 1,
):
self.filename = filename
self.index = index
self.name = name
self.bulk_size = bulk_size
self.cpu_count = cpu_count
self.folder_path = self.filename + "__tmp"
self.offset_stateagentinspector = None
self.es_info = {"host": args.es_host, "port": args.es_port}
self.upload_parts = []
self.filelist = {}
self.ioc_alerts = []
self.exd_alerts = []
self.generic_items = {}
self.es_info = {"host": es_host, "port": es_port}
self.filelist: Dict[str, Tuple[str, int]] = {}
self.ioc_alerts: Dict[str, Any] = {}
self.exd_alerts: List[Mapping[str, str]] = []
self.generic_items: Dict[str, Any] = {}

es = Elasticsearch([self.es_info])
if not es.ping():
raise ValueError("Connection failed")

logging.debug(f"[MAIN] Start parsing {args.filename}.")
logging.debug(f"[MAIN] Pushing on {args.name} index and {args.index} timeline")
logging.debug(f"[MAIN] Start parsing {self.filename}.")
logging.debug(f"[MAIN] Pushing on {self.name} index and {self.index} timeline")

def handle_stateagentinspector(self, path, item_detail):
def handle_stateagentinspector(self, path, item_detail) -> bool:
"""
handle_item: streaming function for xmltodict (stateagentitem)
In:
path: xml item path
item_detail: xml item data
"""
item = {}
uid = path[1][1]["uid"]
Expand Down Expand Up @@ -269,10 +279,13 @@ def handle_stateagentinspector(self, path, item_detail):
.get(item_detail["eventType"], {})
.get("hits_key", None),
)
if self.ioc_alerts[uid]:
item["threat_info"] = self.ioc_alerts[uid]

self.generic_items.setdefault(path[1][0], []).append(item)
return True

def handle_item(self, path, item_detail):
def handle_item(self, path, item_detail) -> bool:
"""
handle_item: streaming function for xmltodict
In:
Expand All @@ -283,7 +296,13 @@ def handle_item(self, path, item_detail):
self.generic_items.setdefault(path[1][0], []).append(item_detail)
return True

def generate_df(self, file, offset, filetype, stateagentinspector=False):
def generate_df(
self,
file: TextIO,
offset: int,
filetype: str,
stateagentinspector: bool = False,
) -> Tuple[pd.DataFrame, bool]:
"""
Generate dataframe from xml file
"""
Expand Down Expand Up @@ -311,6 +330,9 @@ def extract_mans(self):
logging.debug(f"[MAIN] Unzip file in {self.folder_path} [✔]")

def delete_temp_folder(self):
"""
Delete temporary folder
"""
try:
shutil.rmtree(self.folder_path)
logging.debug("[MAIN] temporary folder deleted [✔]")
Expand Down Expand Up @@ -339,17 +361,29 @@ def parse_manifest(self):

def parse_hits(self):
"""
Get hit and alert from hits.json file
Get hit and alert from hits.json file, threat info from threats.json
"""
threats_info = {}
if not os.path.exists(os.path.join(self.folder_path, "threats.json")):
logging.debug("[MAIN] Parsing threats.json [missing]")
else:
with open(os.path.join(self.folder_path, "threats.json"), "r") as f:
for x in json.load(f):
threats_info[x.get("uri_name")] = x

if not os.path.exists(os.path.join(self.folder_path, "hits.json")):
logging.debug("[MAIN] Parsing Hits.json [missing]")
else:
with open(os.path.join(self.folder_path, "hits.json"), "r") as f:
for x in json.load(f):
if x.get("data", {}).get("key", None):
event_id = str(x["data"]["key"]["event_id"])
if event_id not in self.ioc_alerts:
self.ioc_alerts.append(event_id)
threat_id = x.get("threat_id", None)
if event_id not in self.ioc_alerts.keys():
self.ioc_alerts[event_id] = threats_info.get(
threat_id, None
)

elif x.get("data", {}).get("documents", None) or x.get(
"data", {}
).get("analysis_details", None):
Expand All @@ -373,10 +407,11 @@ def parse_hits(self):
),
}
)

if len(self.exd_alerts) > 0:
es = Elasticsearch([self.es_info])
helpers.bulk(
es, self.exd_alerts, index=self.index, doc_type="generic_event"
es, self.exd_alerts, index=self.index, # doc_type="generic_event"
)
logging.debug(
"[MAIN] Parsing Hits.json - %d alerts found [✔]"
Expand Down Expand Up @@ -408,7 +443,7 @@ def process(self):
res = pool.starmap_async(self.process_file, files_list).get()
logging.debug("[MAIN] Pre-Processing [✔]")

def process_file(self, filetype, file, offset):
def process_file(self, filetype: str, file: str, offset: int):
"""
process_file: parse xml to df and clean it
In:
Expand Down Expand Up @@ -541,14 +576,25 @@ def to_elastic(self):
es,
elk_items,
index=self.index,
doc_type="generic_event",
# doc_type="generic_event",
chunk_size=self.bulk_size,
request_timeout=60,
),
maxlen=0,
)
logging.debug("[MAIN] Parallel elastic push [✔]")

def run(self):
"""
Main process
"""
self.extract_mans()
self.parse_manifest()
self.parse_hits()
self.process()
self.to_elastic()
self.delete_temp_folder()


def main():
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -578,21 +624,24 @@ def main():
)

parser.add_argument(
"--version", dest="version", action="version", version="%(prog)s 1.4"
"--version", dest="version", action="version", version="%(prog)s 1.5"
)
args = parser.parse_args()

if not all([args.name, args.index, args.es_port, args.es_host]):
parser.print_usage()
else:
try:
mte = MansToEs(args)
mte.extract_mans()
mte.parse_manifest()
mte.parse_hits()
mte.process()
mte.to_elastic()
mte.delete_temp_folder()
mte = MansToEs(
args.filename,
args.index,
args.name,
args.es_host,
args.es_port,
args.bulk_size,
args.cpu_count,
)
mte.run()
logging.debug("[MAIN] Operation Completed [✔✔✔]")
except:
logging.exception("Error parsing .mans")
Expand Down
14 changes: 7 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
certifi>=2017.7.27.1
elasticsearch==7.0.2
numpy==1.16.4
pandas==0.25.0
python-dateutil==2.8.0
pytz==2019.1
elasticsearch>=7.5.1
numpy>=1.16.6
pandas>=0.24.2
python-dateutil==2.8.1
pytz==2020.1
six==1.12.0
urllib3==1.25.3
urllib3==1.25.9
xmltodict==0.12.0
ciso8601==2.1.1
ciso8601>=2.1.1
Loading

0 comments on commit dd21bbd

Please sign in to comment.