Source code for ironic.common.trait_based_networking.base

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from abc import abstractmethod
from dataclasses import dataclass
from dataclasses import fields
from dataclasses import MISSING
import enum
from typing import ClassVar

import ironic.common.exception as exc
from ironic.common.i18n import _
import ironic.common.trait_based_networking.grammar.parser as tbn_parser


[docs] class Operator(enum.Enum): """A FilterExpression Operator Represents a boolean operator (AND, OR) between two expressions in a filter expression. """ AND = "&&" OR = "||"
[docs] def eval(self, variable, value): # NOTE(clif): These can operate on string values, and return the values # themselves instead of a boolean! match self.name: case self.AND.name: return variable and value case self.OR.name: return variable or value
def __str__(self): return self.value
[docs] class Comparator(enum.Enum): """A FilterExpression Comparator Comparators test mathematical-esque relations between a variable and a string. """ EQUALITY = "==" INEQUALITY = "!=" GT_OR_EQ = ">=" GT = ">" LT_OR_EQ = "<=" LT = "<" PREFIX_MATCH = "=~"
[docs] def eval(self, variable, value): # TODO(clif): Should we some sort of checking of variable type vs # requested operator? match self.name: case self.EQUALITY.name: return variable == value case self.INEQUALITY.name: return variable != value case self.GT_OR_EQ.name: return variable >= value case self.GT.name: return variable > value case self.LT_OR_EQ.name: return variable <= value case self.LT.name: return variable < value case self.PREFIX_MATCH.name: if isinstance(variable, str): return variable.startswith(value) raise exc.TBNComparatorPrefixMatchTypeMismatch( _("Prefix match can only be used with variables " "of type string") )
def __str__(self): return self.value
[docs] class Actions(enum.Enum): """Represents actions recognized by Trait Based Networking""" ATTACH_PORT = "attach_port" ATTACH_PORTGROUP = "attach_portgroup" BOND_PORTS = "bond_ports" GROUP_AND_ATTACH_PORTS = "group_and_attach_ports"
[docs] class Variables(enum.Enum): """Represents a variable (or function) used by FilterExpressions Values of variables are drawn from network-related objects like ports, portgroups, or networks (aka vifs). """ NETWORK_NAME = "network.name" NETWORK_TAGS = "network.tags" PORT_ADDRESS = "port.address" PORT_CATEGORY = "port.category" PORT_IS_PORT = "port.is_port" PORT_IS_PORTGROUP = "port.is_portgroup" PORT_PHYSICAL_NETWORK = "port.physical_network" PORT_VENDOR = "port.vendor"
[docs] def object_name(self): return str(self).split(".")[0]
[docs] def attribute_name(self): return str(self).split(".")[1]
def __str__(self): return self.value
def _retrieve_attribute(attribute_name, tbn_obj): """Helper method to get an attribute from a TBN related object""" attribute = getattr(tbn_obj, attribute_name, None) if attribute is None: raise exc.TBNAttributeRetrievalException(attr_name=attribute_name) return attribute # Allows special case FilterExpression evaluations where Port does not # matter. In these cases a port with this name will always evaluate to # True. UNIVERSAL_PORT_CATEGORY = "__UNIVERSAL_PORT" def _is_universal_port(port): """Check if the port is a special Port that always passes filter""" return isinstance(port, Port) \ and port.category == UNIVERSAL_PORT_CATEGORY # Allows special case FilterExpression evaluations where Network does not # matter. In these cases a network with this name will always evaluate to # True. UNIVERSAL_NETWORK_NAME = "__UNIVERSAL_NETWORK" def _is_universal_network(network): """Check if the network is a special Network that always passes filter""" return isinstance(network, Network) \ and network.name == UNIVERSAL_NETWORK_NAME def _is_universal_tbn_obj(tbn_obj): """Check if the TBN object is a special one that always passes filter""" return _is_universal_port(tbn_obj) or _is_universal_network(tbn_obj)
[docs] class FunctionExpression(object): """A callable function from within a FilterExpression Used to query objects to determine if they pass a FilterExpression or not. """ def __init__(self, variable): self._variable = variable
[docs] def eval(self, port, network): tbn_obj = port if self._variable.object_name() == "port" else network if _is_universal_tbn_obj(tbn_obj): return True attr_name = self._variable.attribute_name() attr_func = _retrieve_attribute(attr_name, tbn_obj) return attr_func()
def __str__(self): return f"{self._variable}"
[docs] class SingleExpression(object): """A single expression from within a FilterExpression A single expression consists of a variable name, a comparator, and a string literal. For example: port.vendor == "purple" In this example, When eval()ed against a port whose vendor is "purple" this expression will return True. Otherwise the expression will return False. """ def __init__(self, variable, comparator, literal): self._variable = variable self._comparator = comparator self._literal = literal
[docs] def eval(self, port, network): tbn_obj = port if self._variable.object_name() == "port" else network if _is_universal_tbn_obj(tbn_obj): return True attr_name = self._variable.attribute_name() attribute = _retrieve_attribute(attr_name, tbn_obj) return self._comparator.eval(attribute, self._literal)
def __str__(self): return f"{self._variable} {self._comparator} '{self._literal}'"
[docs] class CompoundExpression(object): """A compound expression found within a FilterExpression A compound expression consists of a left-hand expression and a right-hand expression joined by a boolean operator. """ def __init__(self, left_expression, operator, right_expression): self._left_expression = left_expression self._operator = operator self._right_expression = right_expression
[docs] def eval(self, port, network): left_result = self._left_expression.eval(port, network) right_result = self._right_expression.eval(port, network) match self._operator: case Operator.OR: return left_result or right_result case Operator.AND: return left_result and right_result
def __str__(self): return (f"{self._left_expression} {self._operator} " f"{self._right_expression}")
[docs] class ParenExpression(object): """Represents an parentheses expression found in a FilterExpression Aids in logically grouping and evaluating expressions before others. """ def __init__(self, expression): self._expression = expression
[docs] def eval(self, port, network): return self._expression.eval(port, network)
def __str__(self): return f"({self._expression})"
[docs] class FilterExpression(object): """Encompasses filters found in TraitActions Used to filter (port, network) pairs to apply actions which pass the filter. Use FilterExpression.parse to transform a string containing a grammatically correct expression into a fully parsed FilterExpression object. See FILTER_EXPRESSION_GRAMMAR in ironic.common.trait_based_networking.grammar.parser for full understanding of how these expressions are parsed. """ def __init__(self, expression): self._expression = expression
[docs] def eval(self, port, network): return self._expression.eval(port, network)
def __str__(self): return f"{self._expression}"
[docs] @classmethod def parse(cls, expression): tree = tbn_parser.FilterExpressionParser.parse(expression) return tbn_parser.FilterExpressionTransformer().transform(tree)
def __eq__(self, other): return str(self) == str(other)
DEFAULT_GROUP_AND_ATTACH_MIN_COUNT: int = 2
[docs] @dataclass(frozen=True) class TraitAction: """An action defined by a NetworkTrait Each action contains a filter (FilterExpression) that determines which (port, network) pairs the action can apply to. """ NECESSARY_KEYS: ClassVar[list[str]] ALL_KEYS: ClassVar[list[str]] trait_name: str action: Actions filter: 'FilterExpression' min_count: int | None = None max_count: int | None = None
[docs] def matches(self, portlike, network): """Check if filter expression matches the port, network pairing.""" return self.filter.eval(portlike, network)
[docs] def validate(self): """Check that the action is valid.""" match self.action: case Actions.GROUP_AND_ATTACH_PORTS: if self.min_count is None \ or self.min_count < DEFAULT_GROUP_AND_ATTACH_MIN_COUNT: return (False, _(f"{self.action.value} must have a min_count of " f"{DEFAULT_GROUP_AND_ATTACH_MIN_COUNT} or " f"greater. Got '{self.min_count}'.")) if self.max_count is not None and \ self.max_count < self.min_count: return (False, _(f"{self.action.value} must have a max_count " "greater or equal to it's min_count. min_count " f"is '{self.min_count}' while max_count is " f"'{self.max_count}'.")) case _: return (True, "") return (True, "")
# Keys from the config file action dict (excludes trait_name, which comes # from the parent trait key rather than the action dict itself). TraitAction.NECESSARY_KEYS = [ f.name for f in fields(TraitAction) if f.name != 'trait_name' and f.default is MISSING and f.default_factory is MISSING ] TraitAction.ALL_KEYS = [ f.name for f in fields(TraitAction) if f.name != 'trait_name' ]
[docs] class NetworkTrait(object): """Represents an entire Trait for Trait Based Networking Each trait can have many actions. Traits can be ordered explicitly if so desired. """ def __init__(self, name, actions, order=1): """Init a NetworkTrait :param name: The named of the trait :param actions: A list of TraitActions which belong to this trait :param order: An optional integer used to determine the explicit ordering of application of traits. Used to sort and apply traits in ascending order. """ self.name = name self.actions = actions self.order = order def __eq__(self, other): if self.name != other.name: return False for action in self.actions: match_found = False for other_action in other.actions: if action == other_action: match_found = True break if not match_found: return False return self.order == other.order
[docs] @dataclass(frozen=True) class PrimordialPort: """A set of common attributes belonging to both Ports and Portgroups""" id: int uuid: str address: str category: str | None physical_network: str | None vendor: str | None
[docs] @dataclass(frozen=True) class Port(PrimordialPort): """A Port used internally to query and match to TraitActions""" available_for_dynamic_portgroup: bool
[docs] @classmethod def from_ironic_port(cls, ironic_port): return cls( id=ironic_port.id, uuid=ironic_port.uuid, address=ironic_port.address, category=ironic_port.category, physical_network=ironic_port.physical_network, vendor=ironic_port.vendor, available_for_dynamic_portgroup=\ ironic_port.available_for_dynamic_portgroup )
[docs] @classmethod def universal_port(cls): return cls( id=0, uuid="UNIVERSAL", address="UNIVERSAL", category=UNIVERSAL_PORT_CATEGORY )
[docs] def is_port(self): return True
[docs] def is_portgroup(self): return False
[docs] @dataclass(frozen=True) class Portgroup(PrimordialPort): """A Portgroup used internally to query and match to TraitActions""" dynamic_portgroup: bool
[docs] @classmethod def from_ironic_portgroup(cls, ironic_portgroup): return cls( id=ironic_portgroup.id, uuid=ironic_portgroup.uuid, address=ironic_portgroup.address, category=ironic_portgroup.category, physical_network=ironic_portgroup.physical_network, vendor=ironic_portgroup.vendor, dynamic_portgroup=ironic_portgroup.dynamic_portgroup )
[docs] def is_port(self): return False
[docs] def is_portgroup(self): return True
[docs] @dataclass(frozen=True) class Network: """A Network (aka vif) Used to match against TraitAction FilterExpressions """ id: str name: str tags: frozenset[str]
[docs] @classmethod def from_vif_info(cls, vif_info): """Helper method to create Networks from vif_info dictionaries""" return cls(vif_info['id'], # vif_info is guaranteed to have 'id'. vif_info.get('name'), vif_info.get('tags'))
[docs] @classmethod def universal_network(cls): return cls( id=0, name=UNIVERSAL_NETWORK_NAME, tags=[] )
[docs] @dataclass(frozen=True) class RenderedAction: """A base class for Actions which are ready to apply""" trait_action: TraitAction node_uuid: str
[docs] @dataclass(frozen=True) class AttachAction(RenderedAction): """Base class for actions which will attach objects to networks"""
[docs] @abstractmethod def portlike_uuid(self): ...
[docs] @abstractmethod def get_portlike_object(self, task): ...
[docs] @dataclass(frozen=True) class AttachPort(AttachAction): """Attach a port to a network Contains all the necessary information to attach a port to a network (vif) """ port_uuid: str network_id: str
[docs] def get_portlike_object(self, task): for port in task.ports: if port.uuid == self.port_uuid: return port return None
[docs] def portlike_uuid(self): return self.port_uuid
def __str__(self): return _(f"Attach port '{self.port_uuid}' on node " f"'{self.node_uuid}' to network '{self.network_id}' " f"via trait {self.trait_action.trait_name}")
[docs] @dataclass(frozen=True) class AttachPortgroup(AttachAction): """Attach a portgroup to a network Contains all the necessary information to attach a portgroup to a network (vif) """ portgroup_uuid: str network_id: str
[docs] def get_portlike_object(self, task): for portgroup in task.portgroups: if portgroup.uuid == self.portgroup_uuid: return portgroup return None
[docs] def portlike_uuid(self): return self.portgroup_uuid
def __str__(self): return _(f"Attach portgroup '{self.portgroup_uuid}' on node " f"'{self.node_uuid}' to network '{self.network_id}'" f"via trait {self.trait_action.trait_name}")
[docs] @dataclass(frozen=True) class GroupAndAttachPorts(RenderedAction): """Assemble a group of ports as a dynamic portgroup and attach it Contains all necessary information to assemble the new portgroup and attach it to a network (vif). """ port_uuids: list[str] network_id: str def __str__(self): return _(f"Assemble node '{self.node_uuid}' ports into a dynamic " f"portgroup and attach it to network '{self.network_id}'. " f"Selected Port UUIDs: {self.port_uuids}.")
[docs] @dataclass(frozen=True) class NoMatch(RenderedAction): """Returned by network planning when a trait action finds no matches""" reason: str def __str__(self): return _(f"No match found for action under trait " f"'{self.trait_action.trait_name}' " f"on node '{self.node_uuid}': {self.reason}")
[docs] @dataclass(frozen=True) class NotImplementedAction(RenderedAction): """Returned by network planning if an action has not been implemented""" action: Actions def __str__(self): return _(f"Action '{self.action.value}' not yet implemented.")