Skip to content

Commit

Permalink
adversary fact analysis endpoint working state
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouremetis committed Apr 17, 2023
1 parent 453a35d commit 7c2bfac
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
2 changes: 1 addition & 1 deletion app/api/v2/managers/adversary_api_manager.py
Expand Up @@ -21,4 +21,4 @@ async def fact_analysis(self, ram_key, adversary_id: str = None, atomic_ordering
else:
fa['errors'].append('Could not find adversary, or "atomic_ordering" field not supplied.')
return
return await self._planning_svc.adversary_fact_requirements(adversary)
return await self._planning_svc.adversary_fact_requirements(adversary, dump=True)
57 changes: 34 additions & 23 deletions app/service/planning_svc.py
Expand Up @@ -265,11 +265,9 @@ async def update_stopping_condition_met(self, planner, operation):
planner.stopping_condition_met = await self.check_stopping_conditions(planner.stopping_conditions,
operation)

async def adversary_fact_requirements(self, adversary):
async def adversary_fact_requirements(self, adversary, dump=False):
""" """
missing_fact_reqs = dict(missing_requirements=[], errors=[])
all_produced_facts = []
fullfillment_abilities = []
all_missing_facts = set([])
NON_FACT_VARS = set(['server', 'group', ])

Expand All @@ -280,46 +278,59 @@ async def adversary_fact_requirements(self, adversary):
missing_fact_reqs['errors'].append('Ability (ID: {ability_id}) not found')
continue

self.log.error(f'-----Ability: {ability[0].name}')
#self.log.error(f'-----Ability: {ability[0].name}')

# record produced and required facts
executors = ability[0].get_executors()
ability = ability[0]
executors = ability.get_executors()
for _platform, name_executor in executors.items():
for _name, executor in name_executor.items():
required_facts = set([])

produced_facts = set([])
# record required facts
for fact_ in re.findall(BasePlanningService.re_variable, executor.test):
required_facts.add(fact_)
self.log.error(f'--------Required Fact: {fact_}')

#self.log.error(f'--------Required Fact: {fact_}')
# record produced facts
for parser in executor.parsers:
for parserconfig in parser.parserconfigs:
for fact_type in ['source', 'target', 'edge']:
fact_ = getattr(parserconfig, fact_type, False)
if fact_:
all_produced_facts.add(fact_)
self.log.error(f'--------Produced Fact: {fact_}')
produced_facts.add(fact_)
#self.log.error(f'--------Produced Fact: {fact_}')

required_fact_diff = (required_facts - all_produced_facts) - NON_FACT_VARS
required_fact_diff = (required_facts - produced_facts) - NON_FACT_VARS
if required_fact_diff:
all_missing_facts.update(required_fact_diff)
missing_fact_reqs['detailed_missing_requirements'].append(dict(
missing_fact_reqs['missing_requirements'].append(dict(
step=i,
ability=ability.display,
missing_required_facts=required_fact_diff,
fullfillment_abilities=set([]),
fullfillment_tactics=set([])
ability=ability.display if dump else ability,
missing_required_facts=list(required_fact_diff),
fulfillment_abilities=[],
fulfillment_tactics=[],
_added_abilities=set([]) # just used to prevent duplicates in adding in abilities in next step, removed before return
))

fullfillment_abilities = await self.get_abilities_by_facts(produced_facts=all_missing_facts)
for fa in fullfillment_abilities:
fact, ability = fa
for missing_req in missing_fact_reqs['detailed_missing_requirements']:
if fact in missing_req['missing_required_facts']:
missing_req['fulfillment_abilities'].add(ability.display),
missing_req['fulfillment_tactics'].add(dict(tactic=ability['tactic'], technique=ability['technique'], technique_id=ability['technique_id']))
# Retrieve abilities for all facts at once, as each call results in loop over all abilities (and is expensive)
fulfillment_abilities = await self.get_abilities_by_facts(produced_facts=all_missing_facts)

# add abilities to correponding produced facts
for fact_name, abilities in fulfillment_abilities['produces'].items():
for ability_ in abilities:
for missing_req in missing_fact_reqs['missing_requirements']:
if fact_name in missing_req['missing_required_facts']:
if ability_.ability_id not in missing_req['_added_abilities']:
missing_req['fulfillment_abilities'].append(ability_.display if dump else ability_),
missing_req['fulfillment_tactics'].append(dict(
tactic=ability_.tactic,
technique_name=ability_.technique_name,
technique_id=ability_.technique_id))
missing_req['_added_abilities'].add(ability_.ability_id)

# Remove ability insert record
for missing_req in missing_fact_reqs['missing_requirements']:
del missing_req['_added_abilities']

return missing_fact_reqs

Expand Down

0 comments on commit 7c2bfac

Please sign in to comment.