import logging
import random
from typing import List, Tuple, Union
logger = logging.getLogger(__name__)
[docs]class NSARed:
"""
Provides the red agent behaviour within the `18-node-def` environment.
The agent is a loose replication of:
https://www.nsa.gov/portals/70/documents/resources/everyone/digital-media-center/publications/the-next-wave/TNW-22-1.pdf#page=9
"""
[docs] def __init__(
self,
skill: float,
action_set: List,
action_probabilities: List,
node_set: List,
zd_start_amount: int = 0,
zd_gain: int = 0,
zd_required: int = 10,
):
self.skill = skill
self.action_set = action_set
self.action_probabilities = action_probabilities
self.node_set = node_set
self.zd_amount = zd_start_amount
self.zg_gain = zd_gain
self.zd_required = zd_required
self.zd_current_day = 0
[docs] def update_node_set(self, nodes: List[int]):
"""
Update the set of nodes the agent can act upon.
Args:
nodes: A list containing the nodes
"""
self.node_set = nodes
[docs] def update_actions(self, actions: List[str], probabilities: List[float]):
"""
Update the set of actions and the corresponding probabilities.
Args:
actions: the new set of actions the agent can take
probabilities: the weights associated with the actions
"""
self.action_set = actions
self.action_probabilities = probabilities
[docs] def choose_node(self) -> int:
"""
Choose a node to act on.
Returns:
The node to act on
"""
node = random.choices(self.node_set)[0]
return node
[docs] def choose_action(self) -> str:
"""
Choose an action to perform.
Returns:
The action to perform on
"""
action = random.choices(self.action_set, weights=self.action_probabilities)[0]
return action
[docs] def update_location(self, target: int, red_current_node: int) -> Tuple[int, int]:
"""
Update the current location of the red agent.
Args:
target: the location to move to
red_current_node: red agents current node
Returns:
Reds previous node
The node red is currently in
"""
red_previous_node = red_current_node
red_current_node = target
return red_previous_node, red_current_node
[docs] def increment_day(self):
"""
Increment the day related to zero day development.
If the day has reached a threshold for a zero day then add 1 to the number of available
zero days.
"""
if self.zd_current_day == self.zd_required:
self.zd_amount += 1
self.zd_current_day = 0
else:
self.zd_current_day += 1
[docs] def check_zd_available(self) -> bool:
"""
Check if a zero day is available .
Returns:
True if a zero day is available to use
False if no zero day is available to use
"""
if self.zd_amount > 0:
self.zd_amount -= 1
return True
else:
return False
[docs] def zd_attack(self, target: int, args: Tuple[List[List[float]], int, int, bool]):
"""
Perform a zd attack on a targetted node.
Args:
target: the target node
args: A tuple containing:
* "machine_states": the current state of the machines
* "red_current_node": the red agents current node
* "able_to_move": if the red agent is able to move
"""
# extracts args
[machine_states, red_current_node, _, able_to_move] = args
machine_states[target][1] = 1
logger.debug(f"Red Team: Zero Day Used on {target + 1}")
if able_to_move:
self.update_location(target, red_current_node)
[docs] def basic_attack(self, target: int, args: Tuple[List[List[float]], int, int, bool]):
"""
Perform a basic attack on a targetted node.
Args:
target: The target node
args: A tuple containing:
* "machine_states": the current state of the machines
* "skill_level": The skill level of the attack
* "attack_success_threshold": A threshold to determine if the attack succeeds
* "red_current_node": The current position of the red agent
* "able_to_move": if the red agent is able to move
"""
[
machine_states,
red_current_node,
attack_success_threshold,
able_to_move,
] = args
# Calculate Attack power based on skill level and target vulnerability score/
attack = (self.skill * machine_states[target][0]) / 100
logger.debug(f"Red Attack Power: {attack}")
# If Attack Power greater than ATTACK_SUCCESS_THRESHOLD, compromise machine
if attack >= attack_success_threshold:
machine_states[target][1] = 1 # Compromised
logger.debug(f"Red Team: {attack} on target {target + 1} - SUCCESS")
if able_to_move:
self.update_location(target, red_current_node)
else:
# If Attack Power below ATTACK_SUCCESS_THRESHOLD, attack failed
logger.debug(f"Red Team: {attack} on target {target + 1} - FAILED")
[docs] def move(self, target: int, args: Tuple[List[List[float]], int, int, bool]):
"""
Move the red agent from one node to another.
Args:
target: the node the agent is moving to
args: A tuple containing the following:
* "red_current_node": the current node of the agent
* "able_to_move": if the agent is able to move
"""
[_, red_current_node, _, able_to_move] = args
if able_to_move:
logger.debug(f"Red Team: Moved to target {target + 1}")
self.update_location(target, red_current_node)
[docs] def spread(self, state, args: Tuple[float, float]):
"""
Attempt to spread to all nodes connected to a compromised node.
Args:
state: the current state of the environment
args: A tuple containing:
* "chance_to_spread": the chance to spread from one node to another
* "chance_to_randomly_compromise": chance to randomly infect a node
"""
[chance_to_spread, _] = args
logger.debug("Red Action: SPREAD")
compromised_nodes = state.get_compromised_nodes()
# runs through all of the compromised nodes
for i in compromised_nodes:
connected = state.get_connected_nodes(i)
for j in connected:
# tries to spread to every connected node
chance = random.randint(0, 100)
if chance < chance_to_spread * 100:
state.modify_node(j, [False, 2])
logger.debug(f"Spread from: {i} to {j}")
[docs] def intrude(self, state, args: Tuple[float, float]):
"""
Attempt to randomly intrude every uncompromised node.
Args:
state: the current state of the environment
args: A tuple containing:
* "chance_to_spread": the chance to spread from one node to another
* "chance_to_randomly_compromise": chance to randomly infect a node
"""
[_, chance_to_randomly_compromise] = args
logger.debug("Red Action: RANDOM INTRUSION")
uncompromised_nodes = state.get_un_compromised_nodes()
# tries to compromise every un-compromised node
for i in uncompromised_nodes:
chance = random.randint(0, 100)
if chance < chance_to_randomly_compromise * 100:
state.modify_node(i, [False, 2])
logger.debug(f"Compromised: {i}")
[docs] def do_action(
self,
nodes: Union[Union[List[int], int], "NodeCollection"], # noqa
action: str,
args: Union[List[float], Tuple[List[List[int]], int, bool]],
):
"""
Perform the selected action on the given target nodes.
Args:
nodes: the nodes to perform the action on
action: the action to perform
0* - actions for the 5 node env
1* - actions for the new node env
args: any parameters needed to perform the action
"""
dispatch = {
"00": self.zd_attack,
"01": self.basic_attack,
"02": self.move,
"10": self.spread,
"11": self.intrude,
}
# args: 0: machine_states, 1: red_current_node, 2: attack_sucsess_threshold, 3: able_to_move
# args: spread, random infect
dispatch[action](nodes, args)