Source code for yawning_titan.envs.specific.four_node_def

"""
NOTE: This environment is deprecated but has been included as an example of how to create a specific environment.

Four Node Connected Environment
------------------------------------

This environment is made up of four nodes in the following topology:
                        +-------------------+
                        |                   |
                        |                   |
         +--------------+   Node 1 - UAD    +---------------+
         |              |     Red Start     |               |
         |              |                   |     +---------+---------+
+--------+----------+   |                   |     |                   |
|                   |   +-------------------+     |                   |
|                   |                             |    Node 3 - UAD   |
|  Node 2 - UAD     |                             |                   |
|                   |   +-------------------+     |                   |
|                   |   |                   |     +---------+---------+
+-------+-----------+   |   Node 4 - UAD    |               |
        |               |    Objective      |               |
        +---------------+                   +---------------+
                        |                   |
                        |                   |
                        +-------------------+

The aim of this environment is for a blue team agent, which has full observability of the
environment, to successfully stop the hard code red agent from getting to the objective.

Inspired by:
- https://github.com/panlybero/MARL-POMDP-CYBERSECURITY
- https://www.scitepress.org/Link.aspx?doi=10.5220%2f0006197105590566
"""
import logging
from typing import Tuple

import gym
import networkx as nx
import numpy as np

from yawning_titan.agents.fixed_red import FixedRedAgent
from yawning_titan.agents.simple_blue import SimpleBlue
from yawning_titan.envs.generic.helpers.graph2plot import CustomEnvGraph
from yawning_titan.envs.specific.core import node_states as nodes
from yawning_titan.envs.specific.core.machines import Machines

logger = logging.getLogger(__name__)


[docs]class FourNodeDef(gym.Env): """Class that represents a simple four node connected network."""
[docs] def __init__( self, attacker_skill: float = 90, red_start_node: int = 0, objective_node: int = 3, n_machines: int = 4, attack_success_threshold: float = 0.6, ): # Setting up the network self.n_machines = n_machines self.graph_adj_matrix = np.array( [0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0] ).reshape(4, 4) self.network = nx.from_numpy_array(self.graph_adj_matrix) assert len(self.network) == self.n_machines logger.debug("Assert Pass: Length of network is equal to number of machines") self.pos = nx.spring_layout(self.network) # Setting up the environment spaces self.action_space = gym.spaces.Discrete(self.n_machines * 2) self.observation_space = gym.spaces.Box( low=0, high=1, shape=(self.n_machines, 2) ) # Setting up Episode Settings self.reward_range = [-50, 50] self.total_rewards = 0 self.total_no_of_steps = 0 self.done = False # Setting up Machine States machines = Machines(n_machines=self.n_machines) self.machine_states = machines.machine_states self.initial_states = machines.initial_states self.uncompromised_nodes = None self.compromised_nodes = None # Setting up blue agents actions self.blue = SimpleBlue(n_machines=self.n_machines) # Setting up the objective for Red self.objective_node = objective_node # Setting up the red agent settings self.red_start_node = red_start_node self.red_previous_node = None self.red_current_node = self.red_start_node self.attacker_skill = attacker_skill self.attack_success_threshold = attack_success_threshold self.red_objective_accomplished = False self.RED = FixedRedAgent(skill=self.attacker_skill) self.target = None self.graph_plotter = None logger.debug("Experiment Started") logger.debug(f"Starting State: {self.initial_states}")
[docs] def reset(self) -> np.array: """ Reset the environment to the default state. Returns: A new starting observation (numpy array) """ # Reset Machine States machines = Machines(n_machines=self.n_machines) self.machine_states = machines.machine_states self.initial_states = machines.get_initial_state() # Reset Episode Settings self.total_rewards = 0 self.total_no_of_steps = 0 self.done = False # Reset Red Agent self.red_objective_accomplished = False self.red_current_node = self.red_start_node self.RED = FixedRedAgent(skill=self.attacker_skill) logger.debug("Environment Reset") logger.debug(f"Starting State: {self.initial_states}") return self._observe()
[docs] def step(self, action: int) -> Tuple[np.array, float, bool, dict]: """ Take a time step and execute the actions for both Blue RL agent and hard-hard coded Red agent. Args: action: The action value generated from the Blue RL agent (int) Returns: observation: The next environment observation (numpy array) reward: The reward value for that timestep (int) done: Whether the epsiode is done (bool) notes: An empty notes dictionary """ logger.debug(f"Timestep - {self.total_no_of_steps}") logger.debug( f"Red Current: {self.red_current_node} Uncomp Node: {self.uncompromised_nodes} Comp Nodes: {self.compromised_nodes}" ) self.uncompromised_nodes = nodes.get_linked_uncompromised_nodes( self.red_current_node, self.network, self.machine_states ) self.compromised_nodes = nodes.get_linked_compromised_nodes( self.red_current_node, self.network, self.machine_states ) red_action, self.target = self.RED.select_action( self.uncompromised_nodes, self.compromised_nodes ) skill_level = self.RED.skill assert (self.red_current_node == 0 and self.target == 3) is False if red_action == 0: self.machine_states[self.target][1] = 1 logger.debug(f"Red Team: Zero Day Used on {self.target + 1}") self.red_previous_node = self.red_current_node self.red_current_node = self.target elif red_action == 1: # Calculate Attack power based on skill level and target vulnerability score attack = (skill_level * self.machine_states[self.target][0]) / 100 logger.debug(f"Red Attack Power: {attack}") # If Attack Power greater than ATTACK_SUCCESS_THRESHOLD, compromise machine if attack >= self.attack_success_threshold: # Compromised self.machine_states[self.target][1] = 1 self.red_previous_node = self.red_current_node self.red_current_node = self.target logger.debug(f"Red Team: {attack} on target {self.target} - SUCCESS") else: logger.debug(f"Red Team: {attack} on target {self.target} - FAILED") elif red_action == 2: self.red_previous_node = self.red_current_node self.red_current_node = self.target logger.debug(f"Red Team: Moved to {self.target}") # BLUE TEAM AGENT self.blue.do_blue_action(action, self.machine_states, self.initial_states) if self.machine_states[3][1] == 1: self.red_objective_accomplished = True if self.machine_states[self.red_current_node][1] == 0: self.red_current_node = self.red_previous_node logger.debug("Blue Team pushed Red Team back") # Calculate Timestep Reward reward = self._get_reward() self.total_rewards += reward # Add to timestep counter self.total_no_of_steps += 1 # Check if Episode is complete # is_done check before reward self._is_done() # Get next observation observation = self._observe() logger.debug( f"Total Reward: {self.total_rewards} Total No. of Steps : {self.total_no_of_steps}" ) return observation, reward, self.done, {"action": action}
[docs] def render(self, mode: str = "human"): """Render the environment to the screen so that it can be played in realtime.""" if self.graph_plotter is None: self.graph_plotter = CustomEnvGraph() comp = {i: True for i in self.compromised_nodes} safe = self.uncompromised_nodes main_graph = self.network main_graph_pos = self.pos states = self.machine_states vulnerabilities = {} for counter, i in enumerate(states): vulnerabilities[counter] = i[0] self.graph_plotter.render( self.total_no_of_steps, main_graph, main_graph_pos, comp, safe, [], self.total_rewards, self.red_previous_node, vulnerabilities, [], "Four node network with an RL blue vs probabilistic red", )
def _observe(self) -> np.array: """ Create the next observation. Returns: A formatted observation array """ observation = np.array(self.machine_states, dtype=np.float32) return observation def _get_reward(self) -> float: """ Calculate the reward for the agent. The reward policy is set to incentivize survival and punish when red wins. Returns: A reward value for a time step """ reward = 0 if self.red_objective_accomplished is True: reward -= 1 if self.total_no_of_steps == 499: reward += 1 return reward def _is_done(self): """ Determine if an episode has been completed. There are two terminal states. 1) If the red agent successfully compromises the objective 2) If the blue agent successfully survives 500 time steps. Sets self.done to True if either are fulfilled. """ if self.red_objective_accomplished is True: logger.debug( f"Red Team Wins - Game Over Blue Team Survived - {self.total_no_of_steps}" ) self.done = True if self.total_no_of_steps == 500: logger.debug("Blue Team Wins - Game Over") self.done = True