Source code for yawning_titan.envs.specific.nsa_node_def

"""
A new node network that can be configured for multiple different configurations.

Paper:
https://www.nsa.gov/portals/70/documents/resources/everyone/digital-media-center/publications/the-next-wave/TNW-22-1.pdf

Currently suppports:
    - 18 node network from the research paper.
    - a network creator that allows you to use multiple topologies and change the connectivity of the network.

Red agent actions:
    Spread:
        Tries to spread to each node connected to a compromised node.
    Randomly infect:
        Tries to randomly infect every currently un-compromised node.

Configurable parameters:
    chance_to_spread
        This is the chance for the red agent to spread between nodes
    chance_to_spread_during_patch
        There is a chance that when a compromised node is patched the red agent "escapes" to neaby nodes and compromises
        them.
    chance_to_randomly_compromise
        This is the chance that the red agent randomly infects a un-compromised node.
    cost_of_isolate
        The cost (negative reward) associated with performing the isolate action (initially set to 10 based on data
        from the paper).
    cost_of_patch
        The cost (negative reward) associated with performing the patch action (initially set to 5 based on data from
        the paper).
    cost_of_nothing
        The cost (negative reward) associated with performing the do nothing action (initially set to 0
        based on data from the paper).
    end
        The number of steps that the blue agent must survive for to win.
    spread_vs_random_intrusio
        The chance that the red agent will choose the spread action on its turn as apposed to the random intrusion
        action.
    punish_for_isolate
        Either True or False. If True then each step the agent is punished based on the number of isolated nodes there
        are.
    reward_method
        Either 0, 1 or 2. Each constitutes a different method of rewarding the agent:
            - 0 is the papers reward system.
            - 1 is my reward system rewarding based on number of un-compromised nodes.
            - 2 is the minimal reward system. The agent gets 1 for a win or -1 for a loss.

"""

import logging
from typing import Tuple

import gym
import numpy as np
from gym import spaces

from yawning_titan.agents.nsa_red import NSARed
from yawning_titan.envs.generic.helpers.graph2plot import CustomEnvGraph
from yawning_titan.envs.specific.core.nsa_node_collection import NodeCollection
from yawning_titan.networks.network import Network
from yawning_titan.networks.network_db import default_18_node_network

logger = logging.getLogger(__name__)


[docs]class NodeEnv(gym.Env): """Class that creates a similar environments to that presented in Ridley 17 (Ref above)."""
[docs] def __init__( self, chance_to_spread: float = 0.01, chance_to_spread_during_patch: float = 0.01, chance_to_randomly_compromise: float = 0.15, cost_of_isolate: float = 10, cost_of_patch: float = 5, cost_of_nothing: float = 0, end: int = 1000, spread_vs_random_intrusion: float = 0.5, punish_for_isolate: bool = False, reward_method: int = 1, network: Network = None, ): super(NodeEnv, self).__init__() # sets up the configuration for the env self.chance_to_spread = chance_to_spread self.chance_to_spread_during_patch = chance_to_spread_during_patch self.chance_to_randomly_compromise = chance_to_randomly_compromise self.cost_of_isolate = cost_of_isolate self.cost_of_patch = cost_of_patch self.cost_of_nothing = cost_of_nothing self.end = end self.spread_vs_random_intrusion = spread_vs_random_intrusion self.punish_for_isolate = punish_for_isolate self.reward_method = reward_method self.network = network self.duration = 0 self.graph_plotter = None # TODO: replace this with a full implementation of networkx Network if self.network is None: self.network = list(default_18_node_network().to_adj_matrix_and_positions()) self.state = NodeCollection(self.network, self.chance_to_spread_during_patch) # skill, action_set, action_probabilities, node_set self.RED = NSARed( 1, [3, 4], [self.spread_vs_random_intrusion, 1 - self.spread_vs_random_intrusion], [str(i) for i in range(self.state.get_number_of_nodes())], ) self.action_space = spaces.Discrete(self.state.get_number_of_nodes() * 2 + 1) self.observation_space = spaces.Box( low=0, high=1, shape=( self.state.get_number_of_nodes() * (self.state.get_number_of_nodes() + 2), ), ) self.env_observation = self.state.get_observation().flatten() logger.debug("Experiment Started") logger.debug(f"Starting State: {self.state.get_observation()}")
[docs] def reset(self) -> np.array: """ Reset the environment to the default state. Returns: A new starting observation (numpy array) """ self.state = NodeCollection(self.network, self.chance_to_spread_during_patch) self.duration = 0 self.env_observation = self.state.get_observation().flatten() self.RED = NSARed( 1, ["10", "11"], [self.spread_vs_random_intrusion, 1 - self.spread_vs_random_intrusion], [str(i) for i in range(self.state.get_number_of_nodes())], ) logger.debug("Environment Reset") logger.debug(f"Starting State: {self.state.get_observation()}") return self.env_observation
[docs] def step(self, action: int) -> Tuple[np.array, float, bool, dict]: """ Take one timestep within the environment. Execute the actions for both Blue RL agent and hard-hard coded Red agent. Args: action: The action value generated from the Blue RL agent (int) Returns: observation: The next environment observation (numpy array) reward: The reward value for that timestep (int) done: Whether the epsiode is done (bool) info: a dictionary containing info about the current state """ logger.debug(f"Timestep: {self.duration}") """RED AGENTS TURN""" red_action = self.RED.choose_action() self.RED.do_action( self.state, red_action, [self.chance_to_spread, self.chance_to_randomly_compromise], ) number_uncompromised = self.state.get_number_of_un_compromised() reward = 0 """BLUE AGENTS TURN""" # checks if there are no safe nodes left --> blue agent loss if number_uncompromised == 0: logger.debug("All states taken - Game Over") done = True if self.reward_method == 0 or self.reward_method == 1: reward = -500 else: reward = -1 else: done = False # if there are still some safe nodes then the blue agent takes their go if action == 0: # blue agent does nothing logger.debug("Blue Action: DO NOTHING") if self.reward_method == 0: reward = -self.cost_of_nothing else: reward = 0 else: # converts the single number for the action into a node and action value action = action - 1 action_node = int(action / 2) action_taken = int(action % 2) if action_taken == 0: # blue agent changes the isolation status on a node (if true then makes it false ect) logger.debug(f"Blue Action: ISOLATE NODE {action_node}") self.state.modify_node(action_node, (True, 0)) # rewards for the action if self.reward_method == 0: reward = -self.cost_of_isolate else: reward = 0 elif action_taken == 1: # blue patches a node logger.debug(f"Blue Action: PATCH NODE {action_node}") self.state.modify_node(action_node, (False, 1)) # if a node is infected and patched then the red agent can spread to connected nodes self.state.spread(action_node) # rewards for the action if self.reward_method == 0: reward = -self.cost_of_patch else: reward = 0 # rewards for the current step if self.reward_method == 0: reward = reward + 3 * self.state.calculate_reward() elif self.reward_method == 1: reward = reward + self.state.calculate_reward() # gets the current observation from the environment self.env_observation = self.state.get_observation().flatten() self.duration += 1 # if the total number of steps reaches the set end then the blue agent wins and is rewarded accordingly if self.duration == self.end: done = True if self.reward_method == 0 or self.reward_method == 1: reward = 500 else: reward = 1 # If turned on then the agent is punished for keeping nodes isolated if self.punish_for_isolate: reward = reward - 0.5 * self.state.get_number_of_isolated() logger.debug( f"Total Reward: {reward} Total No. of Steps : {reward} " f"No. of Compromised Machines: {len(self.state.get_compromised_nodes())} " ) return ( self.env_observation, reward, done, {"nodes": self.state.get_number_of_un_compromised()}, )
[docs] def render(self, mode: str = "human"): """ Render the network using the graph2plot class. This uses a networkx representation of the network. Args: mode: the mode of the rendering """ if self.graph_plotter is None: self.graph_plotter = CustomEnvGraph() comp = list(map(str, self.state.get_compromised_nodes())) safe = list(map(str, self.state.get_un_compromised_nodes())) main_graph = self.state.get_netx_graph() main_graph_pos = self.state.get_netx_pos() reward = round(self.state.calculate_reward(), 2) self.graph_plotter.render( self.duration, main_graph, main_graph_pos, {i: 1 for i in comp}, safe, [], reward, None, {i: 0.5 for i in main_graph.nodes}, [], "", )