Skip to content

Commit

Permalink
Merge pull request #62 from dadokkio/master
Browse files Browse the repository at this point in the history
Add support for campaings entity added in MITRE v12
  • Loading branch information
Cyb3rWard0g committed Nov 19, 2022
2 parents da8c545 + ff8ed68 commit 3bcac3b
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 8 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,5 @@ venv.bak/
# intellij
.idea/

.DS_Store
.DS_Store
.history
148 changes: 141 additions & 7 deletions attackcti/attack_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ def translate_stix_objects(self, stix_objects):
"x_mitre_collection_layers": "collection_layers",
"x_mitre_contributors": "contributors"
}
campaign_stix_mapping = {
"type": "type",
"id": "id",
"created_by_ref": "created_by_ref",
"created": "created",
"modified": "modified",
"name": "name",
"description": "campaign_description",
"aliases": "campaign_aliases",
"object_marking_refs": "object_marking_refs",
"external_references": "external_references",
"x_mitre_first_seen_citation": "first_seen_citation",
"x_mitre_last_seen_citation": "last_seen_citation"
}

# ******** Helper Functions ********
def handle_list(list_object, object_type):
Expand All @@ -233,6 +247,8 @@ def handle_list(list_object, object_type):
obj_dict['tactic_id'] = list_object[0]['external_id']
elif obj_dict['type'] == 'matrix':
obj_dict['matrix_id'] = list_object[0]['external_id']
elif obj_dict['type'] == 'campaign':
obj_dict['campaign_id'] = list_object[0]['external_id']
elif object_type == "kill_chain_phases":
tactic_list = list()
for phase in list_object:
Expand Down Expand Up @@ -269,6 +285,8 @@ def handle_list(list_object, object_type):
stix_mapping = marking_stix_mapping
elif obj['type'] == "x-mitre-data-source":
stix_mapping = data_source_stix_mapping
elif obj['type'] == "campaign":
stix_mapping = campaign_stix_mapping
else:
return stix_objects_list

Expand Down Expand Up @@ -333,7 +351,8 @@ def get_enterprise(self, stix_format=True):
"tactics": self.get_enterprise_tactics,
"matrix": Filter("type", "=", "x-mitre-matrix"),
"identity": Filter("type", "=", "identity"),
"marking-definition": Filter("type", "=", "marking-definition")
"marking-definition": Filter("type", "=", "marking-definition"),
"campaign": self.get_enterprise_campaigns
}
enterprise_stix_objects = dict()
for key in enterprise_filter_objects:
Expand All @@ -342,6 +361,25 @@ def get_enterprise(self, stix_format=True):
enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key])
return enterprise_stix_objects

def get_enterprise_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
""" Extracts all the available campaigns STIX objects in the Enterprise ATT&CK matrix
Args:
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
Returns:
List of STIX objects
"""
enterprise_campaigns = self.TC_ENTERPRISE_SOURCE.query([Filter("type", "=", "campaign")])

if skip_revoked_deprecated:
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)

if not stix_format:
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)
return enterprise_campaigns

def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources = False, stix_format=True):
""" Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix
Expand Down Expand Up @@ -639,15 +677,36 @@ def get_mobile(self, stix_format=True):
"tactics": self.get_mobile_tactics,
"matrix": Filter("type", "=", "x-mitre-matrix"),
"identity": Filter("type", "=", "identity"),
"marking-definition": Filter("type", "=", "marking-definition")
"marking-definition": Filter("type", "=", "marking-definition"),
"campaigns": self.get_mobile_campaigns
}
mobile_stix_objects = {}
for key in mobile_filter_objects:
mobile_stix_objects[key] = self.TC_MOBILE_SOURCE.query(mobile_filter_objects[key]) if isinstance(mobile_filter_objects[key], Filter) else mobile_filter_objects[key]()
if not stix_format:
mobile_stix_objects[key] = self.translate_stix_objects(mobile_stix_objects[key])
return mobile_stix_objects


def get_mobile_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix
Args:
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
Returns:
List of STIX objects
"""

mobile_campaigns = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "campaign"))

if skip_revoked_deprecated:
mobile_campaigns = self.remove_revoked_deprecated(mobile_campaigns)

if not stix_format:
mobile_campaigns = self.translate_stix_objects(mobile_campaigns)
return mobile_campaigns

def get_mobile_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True):
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix
Expand Down Expand Up @@ -948,7 +1007,32 @@ def get_stix_objects(self, stix_format=True):
for resource_type in attack_stix_objects[matrix].keys():
attack_stix_objects[matrix][resource_type] = self.translate_stix_objects(attack_stix_objects[matrix][resource_type])
return attack_stix_objects


def get_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
""" Extracts all the available campaigns STIX objects across all ATT&CK matrices
Args:
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
Returns:
List of STIX objects
"""

enterprise_campaigns = self.get_enterprise_campaigns()
mobile_campaigns = self.get_mobile_campaigns()
for mc in mobile_campaigns:
if mc not in enterprise_campaigns:
enterprise_campaigns.append(mc)

if skip_revoked_deprecated:
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)

if not stix_format:
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)

return enterprise_campaigns

def get_techniques(self, include_subtechniques=True, skip_revoked_deprecated=True, enrich_data_sources=False, stix_format=True):
""" Extracts all the available techniques STIX objects across all ATT&CK matrices
Expand Down Expand Up @@ -1272,7 +1356,7 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
List of STIX objects
"""
valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component'}
valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component', 'campaign'}
if object_type not in valid_objects:
raise ValueError(f"ERROR: Valid object must be one of {valid_objects}")
else:
Expand All @@ -1285,6 +1369,36 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
all_stix_objects = self.translate_stix_objects(all_stix_objects)
return all_stix_objects

def get_campaign_by_alias(self, campaign_alias, case=True, stix_format=True):
""" Extracts campaign STIX objects by alias name accross all ATT&CK matrices
Args:
campaign_alias (str) : Alias of threat actor group
case (bool) : case sensitive or not
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
Returns:
List of STIX objects
"""
if not case:
all_campaigns = self.get_campaigns()
all_campaigns_list = list()
for campaign in all_campaigns:
if "aliases" in campaign.keys():
for alias in campaign['aliases']:
if campaign_alias.lower() in alias.lower():
all_campaigns_list.append(campaign)
else:
filter_objects = [
Filter('type', '=', 'campaign'),
Filter('aliases', '=', campaign_alias)
]
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
if not stix_format:
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
return all_campaigns_list

def get_group_by_alias(self, group_alias, case=True, stix_format=True):
""" Extracts group STIX objects by alias name accross all ATT&CK matrices
Expand Down Expand Up @@ -1314,7 +1428,27 @@ def get_group_by_alias(self, group_alias, case=True, stix_format=True):
if not stix_format:
all_groups_list = self.translate_stix_objects(all_groups_list)
return all_groups_list


def get_campaigns_since_time(self, timestamp, stix_format=True):
""" Extracts campaings STIX objects since specific time accross all ATT&CK matrices
Args:
timestamp (timestamp): Timestamp
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
Returns:
List of STIX objects
"""
filter_objects = [
Filter('type', '=', 'campaign'),
Filter('created', '>', timestamp)
]
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
if not stix_format:
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
return all_campaigns_list

def get_techniques_since_time(self, timestamp, stix_format=True):
""" Extracts techniques STIX objects since specific time accross all ATT&CK matrices
Expand Down Expand Up @@ -1807,4 +1941,4 @@ def enrich_techniques_data_sources(self, stix_object):
if technique_ds:
new_data_sources = [ v for v in technique_ds.values()]
stix_object[i] = stix_object[i].new_version(x_mitre_data_sources = new_data_sources)
return stix_object
return stix_object

0 comments on commit 3bcac3b

Please sign in to comment.