Source code for yawning_titan.config.core

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, Hashable, List, Optional, Union

import yaml

from yawning_titan.exceptions import (
    ConfigGroupValidationError,
    ConfigItemValidationError,
)
from yawning_titan.game_modes.components import _LOGGER

yaml.Dumper.ignore_aliases = lambda *args: True


[docs]class ConfigBase(ABC): """Used to provide helper methods to represent a ConfigGroup object."""
[docs] def get_config_elements( self, types: Optional[ Union[ConfigItem, ConfigGroup, List[Union[ConfigItem, ConfigGroup]]] ] = None, ) -> Dict[str, Union[ConfigItem, ConfigGroup]]: """ Get the attributes of the class that are either :class: `ConfigGroup` or :class:`ConfigItem`. :param _type: An optional type for a specific type of config element. :return: A dictionary of names to config elements. """ if types is not None: if isinstance(types, list): types = tuple(types) return { k: v for k, v in self.__dict__.items() if isinstance(v, types) and not k.startswith("_") } return { k: v for k, v in self.__dict__.items() if isinstance(v, (ConfigItem, ConfigGroup)) and not k.startswith("_") }
[docs] def get_non_config_elements(self) -> Dict[str, Any]: """ Get all attributes of the class that are not :class: `ConfigGroup` or :class: `ConfigItem`. :return: A dictionary of names to attributes. """ return { k: v for k, v in self.__dict__.items() if k not in self.get_config_elements() and not k.startswith("_") }
@property def config_items(self): """Property to represent the :class: `~yawning_titan.config.core.ConfigItem` children of the group.""" return self.get_config_elements(ConfigItem) @property def config_groups(self): """Property to represent the :class: `~yawning_titan.config.core.ConfigGroup` children of the group.""" return self.get_config_elements(ConfigGroup)
[docs] def stringify(self): """Represent the class as a string. :return: A string. """ string = f"{self.__class__.__name__}(" strings = [ f"{name}={val.stringify()}" for name, val in self.get_config_elements().items() ] strings.extend( [f"{name}={val}" for name, val in self.get_non_config_elements().items()] ) return string + ", ".join(strings) + ")"
def __repr__(self) -> str: """Return the result of :method: `ConfigBase.stringify`.""" return self.stringify() def __str__(self) -> str: """Return the result of :method: `ConfigBase.stringify`.""" return self.stringify() def __hash__(self) -> int: """Generate a unique hash for the class.""" element_hash = [v.stringify() for v in self.get_config_elements().values()] element_hash.extend( [ tuple(v) if isinstance(v, (list, dict, set)) else v for v in self.get_non_config_elements().values() ] ) element_hash = [v for v in element_hash if isinstance(v, Hashable)] tuple_hash = tuple(element_hash) return hash(tuple_hash) def __eq__(self, other) -> bool: """Check the equality of any 2 instances of class. :param other: Another potential instance of the class to be compared against. :return: A boolean True if the elements holds the same data otherwise False. """ if isinstance(other, self.__class__): return hash(self) == hash(other) return False
[docs]@dataclass() class ItemTypeProperties(ABC): """An Abstract Base Class that is inherited by config data type properties.""" _allowed_types: List[type] = None """The allowed data types for the item.""" allow_null: Optional[bool] = None """`True` if the config _value can be left empty, otherwise `False`.""" default: Optional[Any] = None """The items default value.""" def __post_init__(self): if self.default: validated_default = self.validate(self.default) if not validated_default.passed: raise validated_default.fail_exceptions[0]
[docs] def to_dict(self) -> Dict[str, Any]: """ An abstract method that returns the properties as a dict. :return: A dict. """ return { k: v for k, v in self.__dict__.items() if v is not None and not k.startswith("_") }
[docs] def validate(self, val) -> ConfigItemValidation: """Perform the base validation checks common to all `ConfigItem` elements. These checks include: - Check that the value is not null if :attribute: `allow_null` is False - Check that the type of the value is in :attribute: `allowed_types` """ validation = ConfigItemValidation() try: if not self.allow_null and val is None: msg = f"Value {val} when allow_null is not permitted." raise ConfigItemValidationError(msg) except ConfigItemValidationError as e: validation.add_validation(msg, e) try: if val is not None and type(val) not in self._allowed_types: msg = ( f"Value {val} is of type {type(val)}, should be " + " or ".join(map(str, self._allowed_types)) + "." ) raise ConfigItemValidationError(msg) except ConfigItemValidationError as e: validation.add_validation(msg, e) return validation
[docs]class ConfigValidationBase(ConfigBase): """The base validation methods for a config element."""
[docs] def __init__( self, fail_reasons: Optional[Union[List[str], str]] = None, fail_exceptions: Optional[ Union[ List[Union[ConfigGroupValidationError, ConfigItemValidationError]], Union[ConfigGroupValidationError, ConfigItemValidationError], ] ] = None, ): if isinstance(fail_reasons, list): self.fail_reasons: List[str] = fail_reasons elif isinstance(fail_reasons, str): self.fail_reasons: List[str] = [fail_reasons] else: self.fail_reasons: List[str] = [] if isinstance(fail_exceptions, list): self.fail_exceptions: List[ Union[ConfigGroupValidationError, ConfigItemValidationError] ] = fail_exceptions elif isinstance(fail_reasons, str): self.fail_exceptions: List[ Union[ConfigGroupValidationError, ConfigItemValidationError] ] = [fail_exceptions] else: self.fail_exceptions: List[ Union[ConfigGroupValidationError, ConfigItemValidationError] ] = []
[docs] def add_validation(self, fail_reason: str, exception: ConfigGroupValidationError): """ Add a validation fail_reason, exception pair to their respective lists. Additionally check that no such error already exists. :param fail_reason: A string message to describe a particular error. :param exception: A wrapped `Exception` object that can be used to raise an error for the `fail_reason`. """ if fail_reason not in self.fail_reasons: self.fail_reasons.append(fail_reason) if exception not in self.fail_exceptions: self.fail_exceptions.append(exception)
[docs] def stringify(self): """Represent the class as a string. :return: A string. """ string = f"{self.__class__.__name__}(" strings = [f"passed={self.passed}"] strings.extend( [f"{name}={val}" for name, val in self.get_non_config_elements().items()] ) return string + ", ".join(strings) + ")"
@property @abstractmethod def passed(self) -> bool: """ Returns True if there are no :attribute: `fail_reasons` or :attribute: `fail_exceptions`. :return: A bool. """ pass
[docs]class ConfigItemValidation(ConfigValidationBase): """Create :class:`ConfigItemValidation` from :class:`ConfigValidationBase`.""" @property def passed(self) -> bool: """ Returns True if there are no :attribute: `fail_reasons` or :attribute: `fail_exceptions`. :return: A bool. """ return not (self.fail_exceptions or self.fail_reasons)
[docs]class ConfigGroupValidation(ConfigValidationBase): """ Used to return a validation result for a group of dependant config items, and the list of item validations. If validation fails, a reason why and any exception raised are returned. """
[docs] def __init__( self, fail_reasons: Optional[Union[List[str], str]] = None, fail_exceptions: Optional[ Union[ List[Union[ConfigGroupValidationError, ConfigItemValidationError]], Union[ConfigGroupValidationError, ConfigItemValidationError], ] ] = None, ): self._element_validation = {} super().__init__(fail_reasons, fail_exceptions)
[docs] def add_element_validation( self, element_name: str, validation: Union[ConfigItemValidation, ConfigGroupValidation], ): """ Add a :class:`ConfigItemValidation` or :class:`ConfigGroupValidation` to the item validation dict. :param element_name: The name of the element. :param validation: the instance of ConfigItemValidation. """ self._element_validation[element_name] = validation
[docs] def to_dict(self, element_name: str = "root", root: bool = True) -> dict: """ Express the error tree as a dictionary. :param element_name: A string name for the element to be represented. :return: A dict of element names to validation errors or validation dictionaries. """ if self.passed: d = {} else: d = {"group": self.fail_reasons if self.fail_reasons else "None"} for e, validation in self.element_validation.items(): if isinstance(validation, ConfigGroupValidation) and ( not validation.group_passed or not validation.passed ): d[e] = validation.to_dict(e, False) elif not validation.passed: d[e] = validation.fail_reasons if root: if not d: return {element_name: "Passed"} return {element_name: d} return d
[docs] def log(self, element_name: str = "root") -> None: """ Return the validation results as a formatted string. :param element_name: A string name for the element to be represented. """ string = "\nValidation results\n" "------------------\n" d = self.to_dict(element_name) if d: string += yaml.dump(d, sort_keys=False, default_flow_style=False) else: string += d.get(element_name, "Passed") print(string)
@property def passed(self) -> bool: """ Returns True if there are no :attribute: `fail_reasons` or :attribute: `fail_exceptions` and the group passed. The group validation has passed and all element validation has passed. :return: A bool. """ return self.elements_passed and self.group_passed @property def group_passed(self) -> bool: """ Returns True if there are no :attribute: `fail_reasons` or :attribute: `fail_exceptions`. :return: A bool. """ return not (self.fail_exceptions or self.fail_reasons) @property def element_validation( self, ) -> Dict[str, Union[ConfigItemValidation, ConfigGroupValidationError]]: """ The dict of element to :class:`ConfigItemValidation` and :class:`ConfigGroupValidation` validations. :return: A dict. """ return self._element_validation @property def elements_passed(self) -> bool: """ Returns True if all items passed validation, otherwise returns False. :return: A bool. """ return all(v.passed for v in self.element_validation.values())
[docs]@dataclass class ConfigItem: """The ConfigItem class holds an items value, doc, and properties.""" value: object """The items value.""" doc: Optional[str] = None """The items doc.""" alias: str = field(default=None, repr=False) """The alias of the config item, i.e. its representation from the original config.""" depends_on: List[str] = field(default_factory=list, repr=False) """A list of :class: `ConfigItem`'s upon which this item depends. If these items are set so must this item be.""" properties: Optional[ItemTypeProperties] = None """The items properties.""" validation: ConfigItemValidation = None """The instance of ConfigItemValidation that provides access to the item validation details.""" def __post_init__(self): if self.value is None and self.properties.default: self.value = self.properties.default self.validate() def __setattr__(self, __name: str, __value: Any) -> None: """ Set an attribute of the :class: `ConfigItem` if the value is to be set, call the validation method. :param __name: the name of the attribute to be set :param __value: the value to set the attribute to """ self.__dict__[__name] = __value if __name == "value": self.validate()
[docs] def to_dict( self, as_key_val_pair: Optional[bool] = False, values_only: Optional[bool] = False, include_none: Optional[bool] = True, ) -> dict: """ Return the ConfigItem as a dict. :param as_key_val_pair: If true, the dict is returned as a value in a key/value pair, the key being the class name. :return: The ConfigItem as a dict. """ if not include_none and self.value is None: return None if values_only: return self.value d = {"value": self.value} if self.doc: d["doc"] = self.doc if self.properties: d["properties"] = self.properties.to_dict() if as_key_val_pair: return {self.__class__.__name__: d} return d
[docs] def validate(self) -> ConfigItemValidation: """ Validate the item against its properties. If no properties exist, simply return a default passed :class:`ConfigItemValidation`. :return: An instance of :class:`ConfigItemValidation`. """ self.validation = ConfigItemValidation() if self.properties: self.validation = self.properties.validate(self.value) return self.validation
[docs] def set_value(self, value: Any) -> None: """ Set the value of the :class:`ConfigItem` bypassing the validation. :param value: The value to be set. """ self.__dict__["value"] = value
[docs] def stringify(self) -> Any: """This is here to allow stringify methods to be call on both :class: `ConfigItem` and :class: `ConfigGroup` classes.""" return self.value
[docs]class ConfigGroup(ConfigBase, ABC): """The ConfigGroup class holds a ConfigItem's, doc, properties, and a ConfigItemValidation."""
[docs] def __init__(self, doc: Optional[str] = None): """The ConfigGroup constructor. :param doc: The groups doc. """ self.doc: Optional[str] = doc self.validation = self.validate()
def __setattr__(self, __name: str, __value: Any) -> None: if ( hasattr(self, __name) and isinstance(getattr(self, __name), ConfigItem) and not isinstance(__value, ConfigItem) ): self.__dict__[__name].value = __value else: self.__dict__[__name] = __value
[docs] def validate( self, raise_overall_exception: Optional[bool] = False ) -> ConfigGroupValidation: """ Validate the grouped items against their properties. :return: An instance of :class:`ConfigGroupValidation`. """ self.validation = ConfigGroupValidation() self.validate_elements() if raise_overall_exception and not self.validation.passed: raise ConfigGroupValidationError(self.validation.log()) return self.validation
[docs] def validate_elements(self): """Call the .validate() method on each of the elements in the group.""" for k, element in self.get_config_elements().items(): self.validation.add_element_validation(k, element.validate())
[docs] def to_dict( self, values_only: Optional[bool] = False, legacy: Optional[bool] = False, include_none: Optional[bool] = True, ): """ Return the ConfigGroup as a dict. :param values_only: Create a dictionary containing only the value of :class: `ConfigItem`'s :param legacy: Convert the group into a unitary depth dictionary of legacy config value (aliases) to :class: `ConfigItem`'s by calling :method: `ConfigGroup.to_legacy`. :return: The ConfigGroup as a dict. """ if legacy: return self.to_legacy_dict() attr_dict = {"doc": self.doc} if self.doc is not None else {} # attr_dict = self.get_non_config_elements() element_dict = {} for k, e in self.get_config_elements().items(): d = e.to_dict(values_only=values_only, include_none=include_none) if not k.startswith("_") and (include_none or d is not None): element_dict[k] = d if values_only: return element_dict return {**attr_dict, **element_dict}
[docs] def to_legacy_dict( self, flattened_dict: Dict[str, Any] = None ) -> Dict[str, ConfigItem]: """Convert the group into a unitary depth dictionary of legacy config value (aliases) to :class: `ConfigItem`'s. :return: a dictionary """ if flattened_dict is None: flattened_dict = {} for v in self.get_config_elements().values(): if isinstance(v, ConfigItem): flattened_dict[v.alias] = v else: flattened_dict.update(v.to_legacy_dict(flattened_dict)) return flattened_dict
[docs] def to_yaml(self, file_path: str): """ Save the values of the elements of the group to a .yaml file. :param file_path: The path to the .yaml file """ with open(file_path, "w") as file: yaml.safe_dump( self.to_dict(values_only=True), file, sort_keys=False, default_flow_style=False, )
[docs] def set_from_dict( self, config_dict: dict, legacy: bool = False, infer_legacy: bool = False, **kwargs, ): """ Set the values of all :class: `ConfigGroup` or :class:`ConfigItem` elements. :param config_dict: A dictionary representing values of all config elements. :param legacy: Whether to use the alias names for config elements to construct the config from a legacy dictionary. :param infer_legacy: Attempt to recognise if a config is of a legacy type. kwargs can contain 2 parameters: - root: Whether the element is a base level element or not. if the element is a root then it should validate all of its descendants. - legacy_lookup: The current flattened dictionary representation of the class by its legacy keys. """ _root = kwargs.get("root", True) _legacy_lookup = kwargs.get("legacy_lookup") if infer_legacy: legacy = ( True if all(k in config_dict for k in ["RED", "BLUE", "OBSERVATION_SPACE"]) else False ) if legacy: if _legacy_lookup is None: _legacy_lookup = self.to_legacy_dict() for element_name, v in config_dict.items(): element: ConfigItem = _legacy_lookup.get(element_name) if isinstance(v, dict): self.set_from_dict( v, legacy=True, root=False, _legacy_lookup=_legacy_lookup ) if element is not None: element.set_value(v) else: for element_name, v in config_dict.items(): element = getattr(self, element_name, None) if isinstance(v, dict) and isinstance(element, ConfigGroup): element.set_from_dict(v, False) elif not isinstance(v, dict) and isinstance(element, ConfigItem): element.set_value(v) else: setattr(self, element_name, v) if _root: self.validate()
[docs] def set_from_yaml( self, file_path: str, legacy: bool = False, infer_legacy: bool = False ): """ Set the elements of the group from a .yaml file. :param file_path: The path to the .yaml file. :param legacy: Whether to use the alias names for config elements to construct the config from a legacy dictionary. :param infer_legacy: Attempt to recognise if a config is of a legacy type. """ try: with open(file_path) as f: config_dict = yaml.safe_load(f) except FileNotFoundError as e: msg = f"Configuration file does not exist: {file_path}" _LOGGER.critical(msg, exc_info=True) raise e self.set_from_dict(config_dict, legacy=legacy, infer_legacy=infer_legacy)