Source code for bfrt_helper.match

from bfrt_helper.util import InvalidOperation
from bfrt_helper.util import InvalidValue
from bfrt_helper.util import mask_from_prefix
from bfrt_helper.fields import Field
from bfrt_helper.fields import IPv4Address
from bfrt_helper.fields import MismatchedTypes


[docs]class MismatchedKeys(Exception): """Raised when the key's name doesn't match when comparing two match objects """ def __init__(self, message): super().__init__(message)
[docs]class Masked: """Base class for all matches which can be represented in terms of a value and a mask. """ def __init__(self): self.value = None self.mask = None
[docs] def subset_of(self, other: "Masked") -> bool: """Compares whether ``self`` is a subset of ``other``. If all the elements in ``self`` are also in ``other``, then this object is a subset. This is ``True`` even if both objects contain exactly the same elements. This is broadly equivalent to a ``<=`` (less than or equal to) operation, and is called by the :py:meth:`Masked.__le__` special method. See also: :py:meth:`Masked.proper_subset_of` Args: other (Masked): Other masked value to compare against. Returns: bool: True if ``self`` is subset of ``other`` """ if not self.mask & other.mask == other.mask: return False return (self.value & other.mask) == (other.value & other.mask)
[docs] def superset_of(self, other: "Masked") -> bool: """Compares whether ``other`` is a subset of ``self``. If all the elements in ``other`` are also in ``self``, then this object is a superset. This is ``True`` even if both objects contain exactly the same elements. This is broadly equivalent to a ``>=`` (less than or equal to) operation, and is called by the :py:meth:`Masked.__ge__` special method. See also: :py:meth:`Masked.proper_superset_of` Args: other (Masked): Other masked value to compare against. Returns: bool: True if ``self`` is superset of ``other`` """ if not self.mask & other.mask == self.mask: return False return (self.value & self.mask) == (other.value & self.mask)
[docs] def proper_subset_of(self, other: "Masked") -> bool: """Compares with another match, where ``self`` is found to be a proper subset of the other. If all the elements in ``self`` are present in ``other``, and, ``other`` contains at least one element not in ``self``, then this object can be considered a proper subset. This is broadly equivalent to a ``<`` (less than) operation, and is called by the :py:meth:`Masked.__lt__` special method. See also: :py:meth:`Masked.subset_of` Args: other (Masked): Other masked value to compare against. Returns: bool: True if ``self`` is proper superset of ``other`` """ return self.subset_of(other) and not self == other
[docs] def proper_superset_of(self, other: "Masked") -> bool: """Compares with another match, where ``self`` is found to be a proper superset of the other. If all the elements in ``other`` are present in ``self``, and, ``self`` contains at least one element not in ``other``, then this object can be considered a proper superset. This is broadly equivalent to a ``>`` (less than) operation, and is called by the :py:meth:`Masked.__gt__` special method. See also: :py:meth:`Masked.uperset_of` Args: other (Masked): Other masked value to compare against. Returns: bool: True if ``self`` is proper superset of ``other`` """ return self.superset_of(other) and not self == other
[docs] def intersection(self, other: "Masked") -> "Masked": """Returns the intersection of ``self`` and ``other``. The intersection of two masked values is the common potential values between ``self`` and ``other``. The following relations should hold true, noting the use of proper subset and superset comparisons: .. code-block:: python intersection = a & b assert a > intersection # Intersection is subset of a assert b > intersection # Intersection also subset of b Since the intersection is the common elements, and if both a and b are proper supersets of the intersection, they must contain different/unique values. Returns: Masked: Contains the elements which are common to ``self`` and ``other``. """ return self.__class__( value=self.value | other.value, mask=self.mask | other.mask )
[docs] def merged(self, other: "Masked") -> "Masked": """ Derives a :py:class:`Masked` object whose mask has all "don't care" bits from both ``self`` and ``other``. .. note:: This is *not* a union. As an example, a ternary expression :math:`A = 0101xxxx` contains all possible values beginning with :math:`0101`. Another value :math:`B = xxxx0101` contains all the possible values ending in :math:`0101`. A merge between the two yields the ternary expression :math:`I = xxxxxxxx`, where another value :math:`C = 11001100` is subset to :math:`I` but is in neither :math:`A` or :math:`B`. If it were a union, any value that is subset must be in either :math:`A` or :math`B`. Returns: Masked: Contains values derived from """ new_mask = self.mask & other.mask return self.__class__(self.value & new_mask, new_mask)
[docs] def overlaps(self, other: "Masked") -> bool: """Returns whether or not two ``Masked`` matches overlap. Overlapping ternary expressions are defined where some, but not all, elements in a ternary :math:`A` exist in a ternary :math:`B`, and, some, but not all, elements in :math:`B` exist in :math:`A`. That is to say that they are neither proper superset or subset of each other, but they share elements. If only one of these conditions held true, they would either be a proper subset or superset of one another. A corollary to this is that longest prefix matches do not overlap, they are strictly a subset or superset of one another. That is to say that :math:`A`, being a proper superset of the intersection :math:`I`, has at least one element not in :math:`I`, and, :math:`B`, being a proper superset of :math:`I`, has at least one element not in intersection :math:`I`, implying that the elements contained in the differences between either :math:`A` or :math:`B` and the intersection are unique to each other. """ c_mask = self.mask & other.mask return not (c_mask == self.mask or c_mask == other.mask) and ( self.value & c_mask == other.value & c_mask )
def __hash__(self) -> int: """Hashes the value and mask of the match. This is useful if the expression is to be used as a key in a dictionary like object. Args: Masked: Returns: int: Hash of ``self.mask`` and ``self.value``. """ return hash((self.value, self.mask))
[docs] def __le__(self, other: "Masked") -> bool: """See :meth:`subset_of`""" return self.subset_of(other)
[docs] def __ge__(self, other: "Masked") -> bool: """See :meth:`superset_of`""" return self.superset_of(other)
[docs] def __lt__(self, other: "Masked") -> bool: """See :meth:`proper_subset_of`""" return self.proper_subset_of(other)
[docs] def __gt__(self, other: "Masked") -> bool: """See :meth:`proper_superset_of`""" return self.proper_superset_of(other)
[docs] def __eq__(self, other: "Masked") -> bool: """Equality compares two ternary expressions. If both value and mask are the same between the two objects, then ``True`` is returned. """ return (self.value == other.value) & (self.mask == other.mask)
[docs] def __and__(self, other: "Masked") -> "Masked": """Returns the intersection of ``self`` and ``other``. see :meth:`intersection` """ return self.intersection(other)
[docs] def __or__(self, other: "Masked") -> "Masked": """Returns a ternary with a merged mask of ``self`` and ``other``. see :meth:`merged` """ return self.merged(other)
[docs] def __iter__(self) -> "Masked.__iter__.iterator": """Creates an `iterator` that returns consecutive elements in the set of ``Masked``, starting from the current value. The iterator itself is not efficient, it's internal value is incremented until it finds a match on every iteration. When the iterator is dereferenced, it returns either an LPM or Ternary expression, depending on what the derived class is. Examples: Using a ``PortID``: >>> match = Ternary(PortId(0x0), mask=0xc) >>> [str(x) for x in iter(match)] [ '0x0 &&& 0xf3', '0x4 &&& 0xf3', '0x8 &&& 0xf3', '0xc &&& 0xf3' ] Or an ``IPv4Address``: >>> match = Ternary(IPv4Address("192.168.42.24"), .. mask="255.255.255.252") >>> [str()] [ '192.168.42.24 &&& 255.255.255.252', '192.168.42.25 &&& 255.255.255.252', '192.168.42.26 &&& 255.255.255.252', '192.168.42.27 &&& 255.255.255.252', ] Returns: An iterator object that """ def iterator(ternary): global_max_value = (1 << self.value.bitwidth) - 1 local_max_val = global_max_value - self.mask.value local_max_val = self.value.value | local_max_val temp = self.value.value match_cls = self.__class__ value_cls = self.value.__class__ yield match_cls(value_cls(temp), value_cls(global_max_value)) initial = self.value.value mask = self.mask.value while temp < local_max_val: temp += 1 if temp & mask != initial: continue if temp & initial != temp: yield match_cls(value_cls(temp), value_cls(global_max_value)) return iterator(self)
[docs]class Ternary(Masked): """Matches values on specific bits. A ``Ternary`` match is a match type which matches on specific bits of the value. This is achieved by the use of a mask; when a match is performed, both the the expected value and the compared value are ANDed with the mask, and an equality operation can be performed without comparing every single bit. ``Ternary`` values are associated with TCAM () memory; and are stored in hardware as `trits`. These are like bits but have 3 states, on, off, and "don't care". Note: Masks could be represented by either ``0`` or ``1`` for a match. In this case, ``1`` bits represent bit's which are matched against. This class represents a ternary expression as a combination of an integer value and a mask. This is because: * Python supports arbitrary length integers; * It is easier to perform bitwise operations on integers. Args: value (Field): The instance of a field, e.g. ``VlanID()``, ``MACAddress()``, etc. mask (Field): The mask representing the bits to match against, either as an instance of the value's type, or, a value that can be passed to that type's constructor. dont_care (bool): If ``True``, no bits will be considered. The semantics of this match can change depending on whether a mask or ``dont_care`` is specified. By default, creating a ``Ternary`` match without a ``mask`` will match on every single bit, effectively acting in the same way as :class:`bfrt_helper.match.Exact`. The mask value does not have to be specified explictly as the same type as the value, any value will be accepted so long as it would be accepted by the value types constructor. If ``dont_care`` is specified, no bits will be compared (i.e, the mask will contain all zeroes). This means that it will match on any value it's compared against. This can be useful if a `table` specifies multiple key fields, and you wanted to match on another field, but not this specific ``Ternary`` element. Examples: A match on a ``PortId`` which only considers the 4 least significant bits:: my_field = Ternary(PortId(42), PortId(0xf)) A match on a ``PortId`` which only considers the 4 least significant bits, but not specifying the mask in terms of the original type:: my_field = Ternary(PortId(42), 0xf) A match that will match exactly on a ``PortId`` of 42:: my_field = Ternary(PortId(42)) A match that will match on any other ``PortId``:: my_field = Ternary(PortId(42), dont_care=True) """ def __init__(self, value: Field, mask: Field = None, dont_care=False): super().__init__() self.max_value = (2**value.bitwidth) - 1 self.value = value self.mask = mask if mask is None: required_bytes = (value.bitwidth + 7) // 8 if dont_care: mask_int = 0 mask_bytes = mask_int.to_bytes(required_bytes, "big") mask = value.__class__.from_bytes(mask_bytes) else: mask_int = (1 << value.bitwidth) - 1 mask_bytes = mask_int.to_bytes(required_bytes, "big") mask = value.__class__.from_bytes(mask_bytes) elif not isinstance(mask, value.__class__): mask = value.__class__(mask) self.value = value & mask self.mask = mask
[docs] def value_bytes(self): """Returns the match value as bytes. Returns: bytes: Internal value converted into bytes. """ return self.value.to_bytes()
[docs] def mask_bytes(self): """Returns the mask value as bytes. Returns: bytes: Internal mask converted into bytes. """ return self.mask.to_bytes()
[docs] @classmethod def dont_care(self): """Not implemented""" pass
def __str__(self): """Returns a string representation of the match. The string representation is output in terms of how this would be written in a P4 program. A value in P4 which is two numbers separated by a triple ampersand (in the correct context) is a ternary expression. Example: >>> match = Ternary(PortId(0x42), 0x0f) >>> str(match) 0x42 &&& 0xf Returns: str: String representation of the match in terms of how it would be written in P4. """ if self.mask.value == self.max_value: return str(self.value) return f"{str(self.value)} &&& {str(self.mask)}" def __repr__(self): return f"Ternary({repr(self.value)}, mask={repr(self.mask)})"
[docs]class LongestPrefixMatch(Masked): """Class that represents only the first *prefix* bits are considered in a match.""" def __init__(self, value: Field, prefix: int): super().__init__() if prefix > value.bitwidth: msg = f"Prefix {prefix} is greater than the maximum allowed for " msg += f"this field. [bitwidth={value.bitwidth}]" raise InvalidValue(msg) self.mask = value.__class__((2**prefix - 1) << (value.bitwidth - prefix)) self.value = value & self.mask self.prefix = prefix def __str__(self): return f"{self.value}/{self.prefix}" def __repr__(self): return f"LongestPrefixMatch({repr(self.value)}, prefix={self.prefix})"
[docs] def value_bytes(self): return self.value.to_bytes()
[docs]class Exact: """An exact match Comparisons between exact matches are on the basis of equality alone. """ def __init__(self, value): self.value = value def __eq__(self, other): return self.value == other.value def __hash__(self): return hash(self.value)
[docs] def value_bytes(self): return self.value.to_bytes()
class Key: """A collection of fields representing the key segment of a table defined in a P4 program. """ def __init__(self, **fields): self.fields = fields def __gt__(self, other): """See :meth:`proper_superset_of`""" return self.proper_superset_of(other) and not self == other def __lt__(self, other): """See :meth:`proper_subset_of`""" return self.proper_subset_of(other) and not self == other def __ge__(self, other): """See :meth:`superset_of`""" return self.superset_of(other) def __le__(self, other): """See :meth:`subset_of`""" return self.subset_of(other) def __eq__(self, other): return self.equal_(other) def __hash__(self): return hash((*self.fields.keys(), *self.fields.values())) def __and__(self, other): return self.intersection(other) def __equality_check(self, other): for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if k1 != k2: raise MismatchedKeys(f"Key {k1} is not equal to {k2}") if type(v1) is not type(v2): raise MismatchedTypes( ( f"Type of {v1} ({type(v1)}) is not equal to {v2} ", f"({type(v2)})", ) ) def conflicts(self, other): acc = True self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Masked): acc = acc and v1.conflicts(v2) else: acc = acc and v1 == v2 return acc def intersection(self, other): """Calculates the intersection of values between two matches. Raises: MismatchedKeys: Raised if a key does not match it's respective compared match. Implies that match fields must be ordered correctly. """ args = {} self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Ternary): # args.append(v1.intersection(v2)) args[k1] = v1.intersection(v2) elif isinstance(v1, LongestPrefixMatch): # args.append(LongestPrefixMatch(v1.value | v2.value, max(v1.prefix, # v2.prefix))) args[k1] = LongestPrefixMatch( v1.value | v2.value, max(v1.prefix, v2.prefix) ) else: if v1 != v2: raise InvalidOperation( "Trying to calculate the difference " "on matches containing non-equal exact values." ) args[k1] = v1 return Key(**args) def superset_of(self, other): """Calculates whether one match is a superset of another For this to hold true, all fields must be superset of their compared respective fields. See :meth:`Masked.superset_of`. Raises: MismatchedKeys: Raised if a key does not match it's respective compared match. Implies that match fields must be ordered correctly. """ acc = True self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Masked): acc = acc and v1 >= v2 else: acc = acc and v1 == v2 return acc def subset_of(self, other): """Calculates whether one match is a superset of another For this to hold _true,_ all fields must be subset of their compared respective fields. See :meth:`Masked.subset_of`. Raises: MismatchedKeys: Raised if a key does not match it's respective compared match. Implies that match fields must be ordered correctly. """ acc = True self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Masked): acc = acc and v1 <= v2 else: acc = acc and v1 == v2 return acc def proper_superset_of(self, other): """Calculates whether one match is a proper superset of another For this to hold true, all fields must be proper superset of their compared respective fields. See :meth:`Masked.proper_superset_of`. Raises: MismatchedKeys: Raised if a key does not match it's respective compared match. Implies that match fields must be ordered correctly. """ acc = True self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Masked): acc = acc and v1 > v2 else: acc = acc and v1 == v2 return acc def proper_subset_of(self, other): """Calculates whether one match is a proper superset of another For this to hold true, all fields must be proper superset of their compared respective fields. See :meth:`Masked.subset_of`. Raises: MismatchedKeys: Raised if a key does not match it's respective compared match. Implies that match fields must be ordered correctly. """ acc = True self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Masked): acc = acc and v1 < v2 else: acc = acc and v1 == v2 return acc # return acc def equal_(self, other): """Calculates whether two matches are equal. For this to hold true, every single element must equal it's other. Raises: MismatchedKeys: Raised if a key does not match it's respective compared match. Implies that match fields must be ordered correctly. """ acc = True self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): acc = acc and v1 == v2 return acc def overlaps(self, other): acc = True if self <= other: return False if self >= other: return False if self == other: return False self.__equality_check(other) for (k1, v1), (k2, v2) in zip(self.fields.items(), other.fields.items()): if isinstance(v1, Masked): acc = acc and (v1 >= v2 or v1 <= v2 or v1.overlaps(v2)) else: acc = acc and v1 == v2 return acc def __str__(self): field_strings = [] for k, v in self.fields.items(): if isinstance(v, Ternary): if v.mask.value == 0: field_strings.append("_") else: field_strings.append(str(v)) else: field_strings.append(str(v)) return "{{ {} }}".format(", ".join(field_strings)) def __repr__(self): acc = "Match(" acc += ", ".join([f'"{k}={repr(v)}"' for k, v in self.fields.items()]) return acc + ")" class IPv4AddressTernary(Ternary): """A helper class for more easily expressing a ternary ``IPv4Address``.""" def __init__(self, value, *, prefix=None, mask=None, dont_care=False): if not isinstance(value, IPv4Address): value = IPv4Address(value) if prefix: mask = IPv4Address(mask_from_prefix(IPv4Address.bitwidth, prefix)) super().__init__(value, mask, dont_care) # def __str__(self): # print('IPv4AddressTernary.__str__') # if self.mask.value == self.max_value: # return str(self.value) # return f"{str(self.value)} &&& {str(self.mask)}"