Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 174 additions & 2 deletions capa/capabilities/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
import logging
import itertools
import collections
from typing import Optional, Union
from dataclasses import dataclass

import intervaltree

import capa.perf
import capa.helpers
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.common import Result
from capa.features.address import Address, SuperblockAddress
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor

Expand Down Expand Up @@ -110,11 +115,157 @@ def find_basic_block_capabilities(
@dataclass
class CodeCapabilities:
function_matches: MatchResults
superblock_matches: MatchResults
basic_block_matches: MatchResults
instruction_matches: MatchResults
feature_count: int


@dataclass
class FlowGraphNode:
# This dataclass will be used in the construction of the function's basic block flow graph.
# Some analysis backends provide native support for flow graphs, but we construct it here regardless
# to decrease the amount of code required on the analysis backend's side (feature extractors).
bb: BBHandle
left: Optional[Address]
right: Optional[Address]


class SuperblockMatcher:
def __init__(self, ruleset: RuleSet, extractor: StaticFeatureExtractor):
self.ruleset: RuleSet = ruleset
self.extractor: StaticFeatureExtractor = extractor
self.features: FeatureSet = collections.defaultdict(set)
self.matches: MatchResults = collections.defaultdict(list)
self.flow_graph: dict[Address, FlowGraphNode] = dict()
self.addr_to_bb = intervaltree.IntervalTree()

def add_basic_block(self, bb: BBHandle, features: FeatureSet):
"""
Register a basic block and its features inot the superblock matcher.

The basic block is added to the flowgraph tree maintained by the matcher object,
and its features are added to the global feature set maintained by the matcher object.

Capabilities will later be extracted from all the registered features (as if they were extracted at the same level),
and will be pruned after that while keeping only capabilities matched on superblocks (i.e., relevant basic blocks
in series with no interruptions between)
"""
# Get the basic blocks that follow the current one.
branches = list(self.extractor.get_next_basic_blocks(bb))
# Get the current bb's size
bb_size = self.extractor.get_basic_block_size(bb)

# Add the basic block to the flow graph.
self.flow_graph[bb.address] = FlowGraphNode(
bb=bb,
left=branches[0] if branches else None,
right=branches[1] if len(branches) > 1 else None,
)

# Register bb's address space in the interval tree.
# This will later be used to determine the bb that a feature was extracted from.
if bb_size != 0:
self.addr_to_bb[bb.address : bb.address + bb_size] = bb.address

# Add features extracted from this bb into the matcher's overall collection of features.
for feature, va in features.items():
self.features[feature].update(va)

def _prune(self):
# go through each rule in self.matches, and each match in self.matches[rule_name],
# and then check if the self.matches[rule_name].locations or locations in each child of self.matches[rule_name].children
# have basic block gaps in them. If so, remove that specific match from self.matches[rule_name].
# if self.matches[rule_name] then becomes empty, remove it from self.matches.
def get_superblock_head(bb_locations: set[Address]) -> list[Address]:
cycles: set[Address] = set()
cycle: list[Address] = []
while bb_locations:
# pick a cycle head
node: Union[Address, FlowGraphNode] = self.flow_graph[bb_locations.pop()]
cycles.add(node.bb.address)
current_cycle = []
while node:
current_cycle.append(node.bb.address)
# find all nodes in that cycle
if node.left in locations and node.right in locations:
# Do not accept superblock matches that have both left and right branches, because features
# from the two disjoint blocks could be incorrectly correlated.
# It is possible however that a bb would loop back to an earlier block that exists at
# a bb in a parallel superblock. Attention should be paid to this case when writing rules.
return []
if node.left in locations:
node = node.left
elif node.right in locations:
node = node.right
else:
node = []
if node in bb_locations:
# next node in cycle found in locations
bb_locations.remove(node)
node = self.flow_graph[node]
elif node in cycles:
# cycle we embarked on connectes with the previously identified one.
cycles.remove(node)
cycle = current_cycle + cycle
break
else:
# reached end of cycle because either the next node is null or it is the last bb in the function.
cycle = cycle + current_cycle
break

if len(cycles) == 1:
# has a single cycle (i.e., superblock).
return cycle
else:
# has multiple disjoint cycles.
return []

def get_bbs_from_locations(locations: set[Address]) -> set[Address]:
bbs_addresses = set()
for location in locations:
# get the bb address from the location
# and add it to the set of bb addresses.
bbs_addresses.add(list(self.addr_to_bb[location])[0].data)
return bbs_addresses

def get_locations(result: Result) -> set[Address]:
# get all locations of found features in the result.
if not result.success:
# Not statements are an edge case, but the locations of their children is not set anyways.
# Logically this is still valid because "not" is usually used to make sure features do not exist.
return set()
if result.children:
# statements are usually what returns children, and they usually do not have locations.
locations: set[Address] = set()
for child in result.children:
locations.update(get_locations(child))
return locations
if result.locations:
# we're dealing with a feature. return its location.
return result.locations
return set()

pruned_matches: MatchResults = collections.defaultdict(list)
for rule_name, matches in self.matches.items():
for _, result in matches:
locations = get_locations(result)
features_bbs = get_bbs_from_locations(locations)
cycle = get_superblock_head(features_bbs)
if cycle and len(cycle) > 1:
# We consider a match only if it spans 2 or more contiguous basic blocks (i.e., a superblock).
# We do not consider superblocks with a single basic block in them to avoid match duplication at function level.
pruned_matches[rule_name].append((SuperblockAddress(cycle), result))

# update the list of valid matches.
self.matches = pruned_matches

def match(self, f_address: Address):
# match superblock rules against the constructed flow graph.
_, self.matches = self.ruleset.match(Scope.SUPERBLOCK, self.features, f_address)
self._prune()


def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle) -> CodeCapabilities:
"""
find matches for the given rules within the given function.
Expand All @@ -123,6 +274,9 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor,
# includes features found within basic blocks (and instructions).
function_features: FeatureSet = collections.defaultdict(set)

# matches found at the constituent superblocks of this function.
superblock_matches: MatchResults = collections.defaultdict(list)

# matches found at the basic block scope.
# might be found at different basic blocks, that's ok.
bb_matches: MatchResults = collections.defaultdict(list)
Expand All @@ -131,6 +285,8 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor,
# might be found at different instructions, that's ok.
insn_matches: MatchResults = collections.defaultdict(list)

superblock_matcher = SuperblockMatcher(ruleset, extractor)

for bb in extractor.get_basic_blocks(fh):
basic_block_capabilities = find_basic_block_capabilities(ruleset, extractor, fh, bb)
for feature, vas in basic_block_capabilities.features.items():
Expand All @@ -142,17 +298,29 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor,
for rule_name, res in basic_block_capabilities.instruction_matches.items():
insn_matches[rule_name].extend(res)

# add basic block and its features and capabilities to the superblock matcher.
superblock_matcher.add_basic_block(bb, basic_block_capabilities.features)

# match capabilities at the superblock scope once all basic blocks have been added.
superblock_matcher.match(fh.address)
for rule_name, res in superblock_matcher.matches.items():
superblock_matches[rule_name].extend(res)
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(function_features, rule, [va])

for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()):
function_features[feature].add(va)

_, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address)
return CodeCapabilities(function_matches, bb_matches, insn_matches, len(function_features))
return CodeCapabilities(function_matches, superblock_matches, bb_matches, insn_matches, len(function_features))


def find_static_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None
) -> Capabilities:
all_function_matches: MatchResults = collections.defaultdict(list)
all_superblock_matches: MatchResults = collections.defaultdict(list)
all_bb_matches: MatchResults = collections.defaultdict(list)
all_insn_matches: MatchResults = collections.defaultdict(list)

Expand Down Expand Up @@ -196,6 +364,7 @@ def find_static_capabilities(
match_count = 0
for name, matches_ in itertools.chain(
code_capabilities.function_matches.items(),
code_capabilities.superblock_matches.items(),
code_capabilities.basic_block_matches.items(),
code_capabilities.instruction_matches.items(),
):
Expand All @@ -212,6 +381,8 @@ def find_static_capabilities(

for rule_name, res in code_capabilities.function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in code_capabilities.superblock_matches.items():
all_superblock_matches[rule_name].extend(res)
for rule_name, res in code_capabilities.basic_block_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in code_capabilities.instruction_matches.items():
Expand All @@ -223,7 +394,7 @@ def find_static_capabilities(
# mapping from feature (matched rule) to set of addresses at which it matched.
function_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(
all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items()
all_function_matches.items(), all_superblock_matches.items(), all_bb_matches.items(), all_insn_matches.items()
):
locations = {p[0] for p in results}
rule = ruleset[rule_name]
Expand All @@ -239,6 +410,7 @@ def find_static_capabilities(
# and we can merge the dictionaries naively.
all_insn_matches.items(),
all_bb_matches.items(),
all_superblock_matches.items(),
all_function_matches.items(),
all_file_capabilities.matches.items(),
)
Expand Down
32 changes: 32 additions & 0 deletions capa/features/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,38 @@ def __hash__(self):
return int.__hash__(self)


class SuperblockAddress(Address):
"""an address of a superblock in a dynamic execution trace"""

def __init__(self, addresses: list[Address]):
for address in addresses:
assert address >= 0
self.addresses: list[Address] = addresses

def __repr__(self):
return "superblock(" + " -> ".join([f"0x{address:x}" for address in self.addresses]) + ")"

def __hash__(self):
return hash(tuple(self.addresses))

def __eq__(self, other):
assert isinstance(other, SuperblockAddress)
return self.addresses == other.addresses

def __lt__(self, other):
assert isinstance(other, SuperblockAddress)
if not other.addresses or not self.addresses:
return False

if self.addresses[0] != other.addresses[0]:
return self.addresses[0] < other.addresses[0]
else:
return len(self.addresses) < len(other.addresses)

def __contains__(self, address: Address):
return address in self.addresses


class ProcessAddress(Address):
"""an address of a process in a dynamic execution trace"""

Expand Down
14 changes: 14 additions & 0 deletions capa/features/extractors/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ def extract_basic_block_features(self, f: FunctionHandle, bb: BBHandle) -> Itera
"""
raise NotImplementedError()

@abc.abstractmethod
def get_next_basic_blocks(self, bb: BBHandle) -> Iterator[Address]:
"""
for a given basic block, retrieve the basic blocks that follow it (if any).
"""
raise NotImplementedError()

@abc.abstractmethod
def get_basic_block_size(self, bb: BBHandle) -> int:
"""
get the size of the given basic block.
"""
raise NotImplementedError()

@abc.abstractmethod
def get_instructions(self, f: FunctionHandle, bb: BBHandle) -> Iterator[InsnHandle]:
"""
Expand Down
7 changes: 7 additions & 0 deletions capa/features/extractors/binja/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
for bb in f.basic_blocks:
yield BBHandle(address=AbsoluteVirtualAddress(bb.start), inner=bb)

def get_next_basic_blocks(self, bb) -> Iterator[AbsoluteVirtualAddress]:
for edge in bb.outgoing_edges:
yield AbsoluteVirtualAddress(edge.target.start)

def get_basic_block_size(self, bb: BBHandle) -> int:
return bb.inner.length

def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.binja.basicblock.extract_features(fh, bbh)

Expand Down
6 changes: 6 additions & 0 deletions capa/features/extractors/dnfile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def get_basic_blocks(self, f) -> Iterator[BBHandle]:
inner=f.inner,
)

def get_next_basic_blocks(self, bb):
yield from []

def get_basic_block_size(self, bb: BBHandle) -> int:
return bb.inner.code_size

def extract_basic_block_features(self, fh, bbh):
# we don't support basic block features
yield from []
Expand Down
9 changes: 9 additions & 0 deletions capa/features/extractors/viv/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Any, Iterator
from pathlib import Path

import envi
import viv_utils
import viv_utils.flirt

Expand Down Expand Up @@ -76,6 +77,14 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
for bb in f.basic_blocks:
yield BBHandle(address=AbsoluteVirtualAddress(bb.va), inner=bb)

def get_next_basic_blocks(self, bb: BBHandle) -> Iterator[AbsoluteVirtualAddress]:
for bva, bflags in bb.inner.instructions[-1].getBranches():
if bflags & envi.BR_COND:
yield AbsoluteVirtualAddress(bva)

def get_basic_block_size(self, bb: BBHandle) -> int:
return bb.inner.size

def extract_basic_block_features(self, fh: FunctionHandle, bbh) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.viv.basicblock.extract_features(fh, bbh)

Expand Down
Loading
Loading