forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 3
/
assemblyline_query.py
169 lines (149 loc) · 7.4 KB
/
assemblyline_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# -*- coding: utf-8 -*-
import json
from . import check_input_attribute, standard_error_message
from assemblyline_client import Client, ClientError
from collections import defaultdict
from pymisp import MISPAttribute, MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['link'], 'format': 'misp_standard'}
moduleinfo = {'version': '1', 'author': 'Christian Studer',
'description': 'Query AssemblyLine with a report URL to get the parsed data.',
'module-type': ['expansion']}
moduleconfig = ["apiurl", "user_id", "apikey", "password", "verifyssl"]
class AssemblyLineParser():
def __init__(self):
self.misp_event = MISPEvent()
self.results = {}
self.attribute = {'to_ids': True}
self._results_mapping = {'NET_DOMAIN_NAME': 'domain', 'NET_FULL_URI': 'url',
'NET_IP': 'ip-dst'}
self._file_mapping = {'entropy': {'type': 'float', 'object_relation': 'entropy'},
'md5': {'type': 'md5', 'object_relation': 'md5'},
'mime': {'type': 'mime-type', 'object_relation': 'mimetype'},
'sha1': {'type': 'sha1', 'object_relation': 'sha1'},
'sha256': {'type': 'sha256', 'object_relation': 'sha256'},
'size': {'type': 'size-in-bytes', 'object_relation': 'size-in-bytes'},
'ssdeep': {'type': 'ssdeep', 'object_relation': 'ssdeep'}}
def get_submission(self, attribute, client):
sid = attribute['value'].split('=')[-1]
try:
if not client.submission.is_completed(sid):
self.results['error'] = 'Submission not completed, please try again later.'
return
except Exception as e:
self.results['error'] = f'Something went wrong while trying to check if the submission in AssemblyLine is completed: {e.__str__()}'
return
try:
submission = client.submission.full(sid)
except Exception as e:
self.results['error'] = f"Something went wrong while getting the submission from AssemblyLine: {e.__str__()}"
return
self._parse_report(submission)
def finalize_results(self):
if 'error' in self.results:
return self.results
event = json.loads(self.misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object', 'Tag') if (key in event and event[key])}
return {'results': results}
def _create_attribute(self, result, attribute_type):
attribute = MISPAttribute()
attribute.from_dict(type=attribute_type, value=result['value'], **self.attribute)
if result['classification'] != 'UNCLASSIFIED':
attribute.add_tag(result['classification'].lower())
self.misp_event.add_attribute(**attribute)
return {'referenced_uuid': attribute.uuid, 'relationship_type': '-'.join(result['context'].lower().split(' '))}
def _create_file_object(self, file_info):
file_object = MISPObject('file')
filename_attribute = {'type': 'filename'}
filename_attribute.update(self.attribute)
if file_info['classification'] != "UNCLASSIFIED":
tag = {'Tag': [{'name': file_info['classification'].lower()}]}
filename_attribute.update(tag)
for feature, attribute in self._file_mapping.items():
attribute.update(tag)
file_object.add_attribute(value=file_info[feature], **attribute)
return filename_attribute, file_object
for feature, attribute in self._file_mapping.items():
file_object.add_attribute(value=file_info[feature], **attribute)
return filename_attribute, file_object
@staticmethod
def _get_results(submission_results):
results = defaultdict(list)
for k, values in submission_results.items():
h = k.split('.')[0]
for t in values['result']['tags']:
if t['context'] is not None:
results[h].append(t)
return results
def _get_scores(self, file_tree):
scores = {}
for h, f in file_tree.items():
score = f['score']
if score > 0:
scores[h] = {'name': f['name'], 'score': score}
if f['children']:
scores.update(self._get_scores(f['children']))
return scores
def _parse_report(self, submission):
if submission['classification'] != 'UNCLASSIFIED':
self.misp_event.add_tag(submission['classification'].lower())
filtered_results = self._get_results(submission['results'])
scores = self._get_scores(submission['file_tree'])
for h, results in filtered_results.items():
if h in scores:
attribute, file_object = self._create_file_object(submission['file_infos'][h])
print(file_object)
for filename in scores[h]['name']:
file_object.add_attribute('filename', value=filename, **attribute)
for reference in self._parse_results(results):
file_object.add_reference(**reference)
self.misp_event.add_object(**file_object)
def _parse_results(self, results):
references = []
for result in results:
try:
attribute_type = self._results_mapping[result['type']]
except KeyError:
continue
references.append(self._create_attribute(result, attribute_type))
return references
def parse_config(apiurl, user_id, config):
error = {"error": "Please provide your AssemblyLine API key or Password."}
if config.get('apikey'):
try:
return Client(apiurl, apikey=(user_id, config['apikey']), verify=config['verifyssl'])
except ClientError as e:
error['error'] = f'Error while initiating a connection with AssemblyLine: {e.__str__()}'
if config.get('password'):
try:
return Client(apiurl, auth=(user_id, config['password']))
except ClientError as e:
error['error'] = f'Error while initiating a connection with AssemblyLine: {e.__str__()}'
return error
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
if not request.get('config'):
return {"error": "Missing configuration."}
if not request['config'].get('apiurl'):
return {"error": "No AssemblyLine server address provided."}
apiurl = request['config']['apiurl']
if not request['config'].get('user_id'):
return {"error": "Please provide your AssemblyLine User ID."}
user_id = request['config']['user_id']
client = parse_config(apiurl, user_id, request['config'])
if isinstance(client, dict):
return client
assemblyline_parser = AssemblyLineParser()
assemblyline_parser.get_submission(request['attribute'], client)
return assemblyline_parser.finalize_results()
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo