"""Provides an API for the ``network.json`` TinyDB file, and a Schema class that defines the network DB fields."""
from __future__ import annotations
import os
from logging import getLogger
from pathlib import Path
from typing import Final, List, Optional, Union
from tinydb import TinyDB
from tinydb.queries import QueryInstance
from yawning_titan.db.doc_metadata import DocMetadataSchema
from yawning_titan.db.query import YawningTitanQuery
from yawning_titan.db.yawning_titan_db import YawningTitanDB, YawningTitanDBSchema
from yawning_titan.networks.network import Network
__all__ = ["NetworkDB", "NetworkSchema", "default_18_node_network"]
_LOGGER = getLogger(__name__)
[docs]class NetworkQuery(YawningTitanQuery):
def __int__(self):
super().__init__()
[docs] @staticmethod
def num_of_nodes(n: int) -> YawningTitanQuery:
"""Returns all Networks with n number of nodes.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkQuery
>>> db = NetworkDB()
>>> networks = db.search(NetworkQuery.num_of_nodes(18))
:param n: The target number of nodes in a Network.
:return: A list of Networks.
"""
return YawningTitanQuery()["nodes"].len_eq(n)
[docs] @staticmethod
def num_of_nodes_between(min: int, max: int) -> YawningTitanQuery:
"""Returns all Networks with between `min` and `max` number of nodes.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkQuery
>>> db = NetworkDB()
>>> networks = db.search(NetworkQuery.num_of_nodes(18))
:param min: The minimum number of nodes in a Network.
:param max: The maximum number of nodes in a Network.
:return: A list of Networks.
"""
return YawningTitanQuery()["nodes"].len_bt(min, max)
def _num_nodes_of_type(self, n, type):
"""Helper function for num_of_entry_nodes."""
def test_len(val, i):
try:
nodes = [n for n in val.values() if val[type]]
return len(nodes) == i
except TypeError:
return False
return self.test(test_len, n)
def _num_nodes_of_type_between(self, min, max, type):
"""Helper function for num_of_entry_nodes."""
def test_len(val, min, max, type):
try:
nodes = [n for n in val.values() if n[type]]
return min <= len(nodes) <= max
except TypeError:
return False
return self.test(test_len, min, max, type)
[docs] @staticmethod
def num_of_entry_nodes(n: int) -> YawningTitanQuery:
"""
Returns all Networks with n number of entry nodes.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkQuery
>>> db = NetworkDB()
>>> networks = db.search(NetworkQuery.num_of_entry_nodes(3))
:param n: The target number of entry nodes.
:return: A List of Nodes.
"""
return NetworkQuery().nodes._num_nodes_of_type(n, "entry_node")
[docs] @staticmethod
def num_of_entry_nodes_between(min: int, max: int) -> YawningTitanQuery:
"""
Returns all Networks with between `min` and `max` number of entry nodes.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkQuery
>>> db = NetworkDB()
>>> networks = db.search(NetworkQuery.num_of_entry_nodes_between(3,6))
:param n: The target number of entry nodes.
:return: A List of Nodes.
"""
return NetworkQuery().nodes._num_nodes_of_type_between(min, max, "entry_node")
[docs] @staticmethod
def num_of_high_value_nodes(n: int) -> YawningTitanQuery:
"""
Returns all Networks with n number of high_value nodes.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkQuery
>>> db = NetworkDB()
>>> networks = db.search(NetworkQuery.num_of_high_value_nodes(3))
:param min: The minimum number of high_value nodes.
:param max: The maximum number of high_value nodes.
:return: A List of Nodes.
"""
return NetworkQuery().nodes._num_nodes_of_type(n, "high_value_node")
[docs] @staticmethod
def num_of_high_value_nodes_between(min: int, max: int) -> YawningTitanQuery:
"""
Returns all Networks with between `min` and `max` number of high value nodes.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkQuery
>>> db = NetworkDB()
>>> networks = db.search(NetworkQuery.num_of_high_value_nodes_between(3,6))
:param min: The minimum number of high_value nodes.
:param max: The minimum number of high_value nodes.
:return: A List of Nodes.
"""
return NetworkQuery().nodes._num_nodes_of_type_between(
min, max, "high_value_node"
)
[docs]class NetworkSchema(YawningTitanDBSchema):
"""
A schema-like class that defines the network DB fields.
Fields are defined using the :class:`~yawning_titan.db.query.YawningTitanQuery` class
so that schema paths can be used directly within :func:`tinydb.table.Table.search`
function calls. All fields are mapped to a property in the
:class:`~yawning_titan.networks.network.Network` class.
:Example:
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkSchema
>>> db = NetworkDB()
>>> network_configs = db.search(NetworkSchema.SET_RANDOM_ENTRY_NODES == True)
"""
SET_RANDOM_ENTRY_NODES: Final[
YawningTitanQuery
] = YawningTitanQuery().set_random_entry_nodes
RANDOM_ENTRY_NODE_PREFERENCE: Final[
YawningTitanQuery
] = YawningTitanQuery().random_entry_node_preference
NUM_OF_RANDOM_ENTRY_NODES: Final[
YawningTitanQuery
] = YawningTitanQuery().num_of_random_entry_nodes
SET_RANDOM_HIGH_VALUE_NODES: Final[
YawningTitanQuery
] = YawningTitanQuery().set_random_high_value_nodes
RANDOM_HIGH_VALUE_NODE_PREFERENCE: Final[
YawningTitanQuery
] = YawningTitanQuery().random_high_value_node_preference
NUM_OF_RANDOM_HIGH_VALUE_NODES: Final[
YawningTitanQuery
] = YawningTitanQuery().num_of_random_high_value_nodes
SET_RANDOM_VULNERABILITIES: Final[
YawningTitanQuery
] = YawningTitanQuery().set_random_vulnerabilities
NODE_VULNERABILITY_LOWER_BOUND: Final[
YawningTitanQuery
] = YawningTitanQuery().node_vulnerability_lower_bound
NODE_VULNERABILITY_UPPER_BOUND: Final[
YawningTitanQuery
] = YawningTitanQuery().node_vulnerability_upper_bound
[docs]class NetworkDB:
"""
The :class:`~yawning_titan.db.networks.NetworkDB` class extends :class:`~yawning_titan.db.YawningTitanDB`.
The below code blocks demonstrate how to use the :class:`~yawning_titan.db.networks.NetworkDB` class.
- Instantiate the Network DB:
.. code:: python
>>> from yawning_titan.networks.network_db import NetworkDB, NetworkSchema
>>> db = NetworkDB()
- Search for all network that have set_random_entry_nodes == True.
.. code:: python
>>> db.search(NetworkSchema.SET_RANDOM_ENTRY_NODES == True)
"""
[docs] def __init__(self):
self._db = YawningTitanDB("networks")
def __enter__(self) -> NetworkDB:
return NetworkDB()
def __exit__(self, exc_type, exc_val, exc_tb):
self._db.__exit__(exc_type, exc_val, exc_tb)
[docs] def insert(
self,
network: Network,
name: Optional[str] = None,
description: Optional[str] = None,
author: Optional[str] = None,
) -> Network:
"""
Insert a :class:`~yawning_titan.networks.network.Network` into the DB as ``.json``.
:param network: An instance of :class:`~yawning_titan.networks.network.Network`
:class:`~yawning_titan.db._doc_metadata.DocMetadata`.
:class:`~yawning_titan.db._doc_metadata.DocMetadata`.
:class:`~yawning_titan.db._doc_metadata.DocMetadata`.
:param name: The config name.
:param description: The config description.
:param author: The config author.
:return: The inserted :class:`~yawning_titan.networks.network.Network`.
"""
network.doc_metadata.update(name, description, author)
self._db.insert(network.to_dict(json_serializable=True))
return network
[docs] def all(self) -> List[Network]:
"""
Get all :class:`~yawning_titan.networks.network.Network` from the network DB.
:return: A :class:`list` of :class:`~yawning_titan.networks.network.Network`.
"""
return [Network.create(doc) for doc in self._db.all()]
[docs] def show(self, verbose=False):
"""
Show details of all entries in the db.
:param verbose: If True, all doc metadata details are shown,
otherwise just the name is shown.
"""
self._db.show(verbose)
[docs] def get(self, uuid: str) -> Union[Network, None]:
"""
Get a network config document from its uuid.
:param uuid: A target document uuid.
:return: The network config document as an instance of
:class:`~yawning_titan.networks.network.Network` if the uuid exists,
otherwise :py:class:`None`.
"""
# self._db.db.clear_cache()
doc = self._db.get(uuid)
if doc:
return Network.create(doc)
[docs] def search(self, query: YawningTitanQuery) -> List[Network]:
"""
Searches the :class:`~yawning_titan.networks.network.Network` with a :class:`NetworkSchema` query.
:param query: A :class:`~yawning_titan.db.query.YawningTitanQuery`.
:return: A :class:`list` of :class:`~yawning_titan.networks.network.Network`.
"""
network_configs = []
for doc in self._db.search(query):
network_configs.append(Network.create(doc))
return network_configs
[docs] def count(self, cond: Optional[QueryInstance] = None) -> int:
"""
Count how many docs are in the db. Extends :class:`tinydb.table.Table.count`.
A :class:`~yawning_titan.db.query.YawningTitanQuery` can be used to
filter the count.
:param cond: An optional :class:`~yawning_titan.db.query.YawningTitanQuery`.
Has a default value of ``None``.
:return: The number of docs counted.
"""
if cond:
return self._db.count(cond)
return len(self._db.all())
[docs] def update(
self,
network: Network,
name: Optional[str] = None,
description: Optional[str] = None,
author: Optional[str] = None,
) -> Network:
"""
Update a :class:`~yawning_titan.networks.network.Network`. in the db.
:param network: An instance of :class:`~yawning_titan.networks.network.Network`.
:param name: The config name.
:param description: The config description.
:param author: The config author.
:return: The updated :class:`~yawning_titan.networks.network.Network`.
"""
# Update the configs metadata
network.doc_metadata.update(name, description, author)
# Perform the update and retrieve the returned doc
doc = self._db.update(
network.to_dict(json_serializable=True),
network.doc_metadata.uuid,
name,
description,
author,
)
if doc:
# Update the configs metadata created at
network.doc_metadata.updated_at = doc["_doc_metadata"]["updated_at"]
return network
[docs] def upsert(
self,
network: Network,
name: Optional[str] = None,
description: Optional[str] = None,
author: Optional[str] = None,
) -> Network:
"""
Upsert a :class:`~yawning_titan.networks.network.Network`. in the db.
:param network: An instance of :class:`~yawning_titan.networks.network.Network`.
:param name: The config name.
:param description: The config description.
:param author: The config author.
:return: The upserted :class:`~yawning_titan.networks.network.Network`.
"""
network.doc_metadata.update(name, description, author)
doc = self._db.upsert(
network.to_dict(json_serializable=True),
network.doc_metadata.uuid,
name,
description,
author,
)
# Update the configs metadata created at
if doc and "updated_at" in doc["_doc_metadata"]:
network.doc_metadata.updated_at = doc["_doc_metadata"]["updated_at"]
return network
[docs] def remove(self, network: Network) -> Union[str, None]:
"""
Remove a :class:`~yawning_titan.networks.network.Network`. from the db.
:param network: An instance of :class:`~yawning_titan.networks.network.Network`.
:return: The uuid of the removed :class:`~yawning_titan.networks.network.Network`.
"""
return self._db.remove(network.doc_metadata.uuid)
[docs] def remove_by_cond(self, cond: QueryInstance) -> List[str]:
"""
Remove :class:`~yawning_titan.networks.network.Network`. from the db that match the query.
:param cond: A :class:`~yawning_titan.db.query.YawningTitanQuery`.
:return: The list of uuids of the removed :class:`~yawning_titan.networks.network.Network`.
"""
return self._db.remove_by_cond(cond)
[docs] def reset_default_networks_in_db(self, force=False):
"""
Reset the default network in the db.
Achieves this by loading the default
`yawning_titan/networks/_package_data/network.json` db file into TinyDB,
then iterating over all docs and forcing an update of each one in the main
networks db from its uuid if they do not match.
:param force: Forces a reset without checking for equality when
True. Has a default value of False.
"""
# Obtain the path to the default db file in package data
self._db.db.clear_cache()
network_root = Path(__file__).parent.resolve()
default_network_path = os.path.join(
network_root, "_package_data", "network.json"
)
# Load the default db file into TinyDB
default_db = TinyDB(default_network_path)
# Iterate over all default networks, and force an update in the
# main NetworkDB by uuid.
for package_data_network in default_db.all():
uuid = package_data_network["_doc_metadata"]["uuid"]
name = package_data_network["_doc_metadata"]["name"]
# Get the matching network from the networks db
try:
db_network = self.get(uuid)
except KeyError:
db_network = None
# If the network doesn't match the default, or it doesn't exist,
# perform an upsert.
if not force and db_network:
reset = False
else:
reset = True
if reset:
self._db.db.upsert(package_data_network, DocMetadataSchema.UUID == uuid)
_LOGGER.info(
f"Reset default network '{name}' in the "
f"{self._db.name} db with uuid='{uuid}'."
)
# Clear the default db cache and close the file.
default_db.clear_cache()
default_db.close()
[docs] def rebuild_db(self):
"""
Rebuild the db.
Actions taken:
- clear the query cache
- truncate the db
- call :func:`~yawning_titan.networks.network_db.NetworkDB.reset_default_networks_in_db`
.. warning::
This function completely rebuilds the database. Any custom networks
saved in the db will be lost. The default networks can be reset
using the :func:`~yawning_titan.networks.network_db.NetworkDB.reset_default_networks_in_db`
function.
"""
_LOGGER.info(f"Rebuilding the {self._db.name} db.")
self._db.db.clear_cache()
self._db.db.truncate()
self.reset_default_networks_in_db()
[docs]def default_18_node_network() -> Network:
"""
The standard 18-node network found in the Ridley, A. (2017) research paper.
.. see also:: https://www.nsa.gov/portals/70/documents/resources/everyone/digital-media-center/publications/the-next-wave/TNW-22-1.pdf#page=9
:return: An instance of :class:`~yawning_titan.networks.network.Network`.
"""
with NetworkDB() as db:
return db.get("b3cd9dfd-b178-415d-93f0-c9e279b3c511")
[docs]def dcbo_base_network() -> Network:
"""
Creates the same network used to generated DCBO data.
.. node::
This function replaces the network that was defined in
`yawning_titan/integrations/dcbo/base_net.txt`.
:return: An instance of :class:`~yawning_titan.networks.network.Network`.
"""
with NetworkDB() as db:
return db.get("47cb9f49-b53d-44f8-9a7b-3d74cf2ec1b0")