Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new: break on duplicate for object update #709

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pymisp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPO
o.from_dict(**new_object)
return o

def update_object(self, misp_object: MISPObject, object_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPObject]:
def update_object(self, misp_object: MISPObject, object_id: Optional[int] = None, pythonify: bool = False, break_on_duplicate: bool = False) -> Union[Dict, MISPObject]:
"""Update an object on a MISP instance

:param misp_object: object to update
Expand All @@ -561,7 +561,8 @@ def update_object(self, misp_object: MISPObject, object_id: Optional[int] = None
oid = get_uuid_or_id_from_abstract_misp(misp_object)
else:
oid = get_uuid_or_id_from_abstract_misp(object_id)
r = self._prepare_request('POST', f'objects/edit/{oid}', data=misp_object)
params = {'breakOnDuplicate': True} if break_on_duplicate else {}
r = self._prepare_request('POST', f'objects/edit/{oid}', data=misp_object, kw_params=params)
updated_object = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_object:
return updated_object
Expand Down
24 changes: 22 additions & 2 deletions tests/testlive_comprehensive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1480,16 +1480,36 @@ def test_add_event_with_attachment_object_controller(self):
self.assertEqual(len(obj_attrs), 1, obj_attrs)
self.assertEqual(r.name, 'file', r)

# Test break_on_duplicate at object level
# Test break_on_duplicate at object level for add_object
fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True)
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)

# Test break on duplicate with breakOnDuplicate key in object
# Test break on duplicate with breakOnDuplicate key in object for add_object
fo_dup.breakOnDuplicate = True
r = self.user_misp_connector.add_object(first, fo_dup)
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)

# Test break_on_duplicate at object level for update_object
obj1 = MISPObject('domain-ip')
obj1.add_attribute('ip', value="8.8.8.8")
obj1.add_attribute('domain', value="google.lu")
obj2 = MISPObject('domain-ip')
obj2.add_attribute('ip', value="8.8.8.8")
obj2.add_attribute('domain', value="google.fr")
o1 = self.user_misp_connector.add_object(first, obj1, pythonify=True)
self.assertEqual(o1.attributes[1].value, 'google.lu')
o2 = self.user_misp_connector.add_object(first, obj2, pythonify=True)
self.assertEqual(o2.attributes[1].value, 'google.fr')
obj2.attributes[1].value = 'google.lu'
r = self.user_misp_connector.update_object(obj2, o2.uuid, break_on_duplicate=True)
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)

# Test break on duplicate with breakOnDuplicate key in object for update_object
obj2.breakOnDuplicate = True
r = self.user_misp_connector.update_object(obj2, o2.uuid)
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)

# Test refs
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
Expand Down