Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiple sharing options for TheHive 4.x #327

Open
wants to merge 1 commit into
base: ZzZ-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 127 additions & 1 deletion thehive4py/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1406,4 +1406,130 @@ def update_alert_artifact(self, artifact_id, alert_artifact, fields=[]):
try:
return requests.patch(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseObservableException("Case observable update error: {}".format(e))
raise CaseObservableException("Case observable update error: {}".format(e))


def create_case_share(self, case_id, case_share):

"""
Create a case share

Arguments:
case_id (str): Case identifier
case_share: Instance of [CaseShare][thehive4py.models.CaseShare]

Returns:
response (requests.Response): Response object including a JSON description of a case share

Raises:
CaseSharingException: An error occured during the case share creation

"""

req = self.url + "/api/case/{}/shares".format(case_id)
data = {"shares": [case_share.jsonify(excludes=['id'])]}

try:
return requests.post(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseSharingException("Case share creation error: {}".format(e))

def delete_case_share(self, case_id, organisation):

"""
Delete a case share

Arguments:
case_id (str): Case identifier
organisation (str): Name of the organization from which the share should be deleted

Returns:
response (requests.Response): Response object including a JSON description of a case share

Raises:
CaseSharingException: An error occured during the case share deletion

"""

req = self.url + "/api/case/{}/shares".format(case_id)
data = {"organisations": [organisation]}

try:
return requests.delete(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseSharingException("Case share deletion error: {}".format(e))

def list_case_shares(self, case_id):

"""
List all organizations a case is shared with

Arguments:
case_id (str): Case identifier

Returns:
response (requests.Response): Response object including a JSON description of all organizations a case is shared with

Raises:
CaseSharingException: An error occured while retrieving the sharing information

"""

req = self.url + "/api/case/{}/shares".format(case_id)

try:
return requests.get(req, headers={'Content-Type': 'application/json'}, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseSharingException("Case share retrieving error: {}".format(e))

def create_case_task_share(self, task_id, organisation):

"""
Share a specific case task
!! Note: To successfully create the share the case has to be already shared with the specified organisation

Arguments:
task_id (str): Id of the task to share
organisation (str): Name of the organization to share the task with

Returns:
response (requests.Response): Response object including a JSON description of a case task share

Raises:
CaseTaskException: An error occured during case task sharing

"""

req = self.url + "/api/case/task/{}/shares".format(task_id)
data = {"organisations": [organisation]}

try:
return requests.post(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseTaskException("Case task share creation error: {}".format(e))

def create_case_observable_share(self, observable_id, organisation):

"""
Share a specific case observable
!! Note: To successfully create the share the case has to be already shared with the specified organisation

Arguments:
observable_id (str): Id of the observable to share
organisation (str): Name of the organization to share the observable with

Returns:
response (requests.Response): Response object including a JSON description of a case observable share

Raises:
CaseTaskException: An error occured during case observable sharing

"""

req = self.url + "/api/case/artifact/{}/shares".format(observable_id)
data = {"organisations": [organisation]}

try:
return requests.post(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseTaskException("Case observable share creation error: {}".format(e))
5 changes: 5 additions & 0 deletions thehive4py/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class CaseObservableException(CaseException):
"""
pass

class CaseSharingException(CaseException):
"""
Exception raised by failure of API calls related to `Case Sharing` handling
"""
pass

class ObservableException(TheHiveException):
"""
Expand Down
22 changes: 22 additions & 0 deletions thehive4py/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,29 @@ def __init__(self, **attributes):
self.data = [{'attachment': (filename, file_object, mime)}]
else:
self.data = data


class CaseShare(JSONSerializable):
"""
Model class describing a case share as defined in TheHive

Arguments:
organisationName (str): Name of the organization to share with or delete share from. Default: None
profile (str): Sharing profile. Default: read_only
tasks (str): Tasks to be shared. Default: None
observables (str): Observables to be shared. Default: None
json (JSON): If the field is not equal to None, the Task is instantiated using the JSON value instead of the arguements
"""

def __init__(self, **attributes):
if attributes.get('json', False):
attributes = attributes['json']

self.organisationName = attributes.get('organisationName', None)
self.profile = attributes.get('profile', None)
self.tasks = attributes.get('tasks', None)
self.observables = attributes.get('observables', None)


class Alert(JSONSerializable):
"""
Expand Down