import json
from bfrt_helper.fields import StringField
from bfrt_helper.bfrt_info import BfRtInfo
import bfrt_helper.pb2.bfruntime_pb2 as bfruntime_pb2
from bfrt_helper.pb2.bfruntime_pb2 import WriteRequest
from bfrt_helper.pb2.bfruntime_pb2 import Update
# Atomicity = WriteRequest.Atomicity
from bfrt_helper.pb2.bfruntime_pb2 import (
SetForwardingPipelineConfigRequest as SetPipelineReq,
)
from bfrt_helper.match import Exact
from bfrt_helper.match import LongestPrefixMatch
from bfrt_helper.match import Ternary
from bfrt_helper.fields import Field, DevPort
[docs]class UnknownAction(Exception):
"""Exception raised when an action for a given table could not be found.
Args:
table_name (str): String containing the name of the table.
action_name (str): String containing the name of the action.
"""
def __init__(self, table_name, action_name):
msg = f"Could not find action {table_name}::{action_name}"
super().__init__(msg)
[docs]class UnknownActionParameter(Exception):
"""Exception raised when a action parameter for a given table could not be
found.
Some tables have associated parameters with them; these are optional.
These are applied to an action, for example (in P4)::
action send_to_multicast_group(MulticastGroupId_t group) {
ingress_metadata.multicast_grp_a = group;
}
Args:
table_name (str): String containing the name of the table.
action_name (str): String containing the name of the action.
param_name (str): String containing the name of the parameter.
"""
def __init__(self, table_name: str, action_name: str, param_name: str):
msg = (
"Could not find action parameter ",
f"{table_name}::{action_name}::{param_name}",
)
super().__init__(msg)
[docs]class UnknownTable(Exception):
"""Exception raised when a given table could not be found.
Args:
table_name (str): String containing the name of the table.
"""
def __init__(self, table_name: str):
super().__init__(f"Could not find table {table_name}")
[docs]class UnknownKeyField(Exception):
"""Exception raised when a key field for a given table could not be found.
A key field is a part of the match construct, and includes a name and
data. When a table is executed, the key field names are used to retrieve
a value with the same name as defined in either metadata or headers. This
value is compared with the key field's data on the basis of the fields
defined match type::
table multicast {
key = {
hdr.vlan.id: exact;
}
actions = {
send_to_multicast_group;
}
const entries = {
100: send_to_multicast_group;
}
}
In this example, a key is defined to match on a vlan id which has been
parsed into the program's headers. The key field's name in this case is
``hdr.vlan.id`` and it's match type is ``exact``. If the name of this field
is not defined, in some operations this exception will be raised.
Args:
table_name (str): String containing the name of the table.
action_name (str): String containing the name of the action.
"""
def __init__(self, table_name: str, field_name: str):
super().__init__(f"Could not find key field {table_name}::{field_name}")
[docs]class MismatchedMatchType(Exception):
"""Exception raised when the match for a key field declared in a request
does not match that which is defined in the table.
The current accepted match types are:
* :py:class:`LongestPrefixMatch`
* :py:class:`Exact`
* :py:class:`Ternary`
Args:
field_name (str): The name of the field in question.
field_data (Field): The instance of the incorrect field.
expected (str): The name of the expected data type.
"""
def __init__(self, field_name: str, field_data: Field, expected: str):
clz = field_data.__class__.__name__
msg = (
f"Expected field type for {field_name} is {expected}, but have ",
f"{clz}",
)
super().__init__(msg)
[docs]class MismatchedDataSize(Exception):
"""Exception raised when the data size of a field in a request does not
match that which is registered to the table.
Most fields have an associated bitwidth. This is defined in the P4 program,
and is presented in the BfRt info file if generated. While we can't do
strict type checking, we can compare the bitwidths of the input and target
fields (any field registered in this library, or defined by the user, will
be equivalent to any other field with the same bitwidth).
Args:
expected (int): Bitwidth of field as defined by P4 program.
observed (int): Bitwidth of field presented by the user.
"""
def __init__(self, expected: int, observed: int):
msg = f"Expected data size {expected} but have {observed}"
super().__init__(msg)
[docs]class InvalidActionParameter(Exception):
def __init__(self, table_name: str, action_name: str, param_name: str, reason: str):
msg = f"{table_name}::{action_name}::{param_name}: {reason}"
super().__init__(msg)
[docs]class BfRtHelper:
"""Barefoot Runtime gRPC Helper Class"""
def __init__(self, device_id, client_id, bfrt_info):
self.device_id = device_id
self.client_id = client_id
self.bfrt_info = bfrt_info
[docs] def create_subscribe_request(
self,
learn: bool = True, # Receive learn notifications
timeout: bool = True, # Receive timeout notifications
port_change: bool = True, # Receive port state change notifications
request_timeout: int = 10, # Subscribe response timeout
):
"""Create a subscribe request for registering with a device.
After a gRPC connection has been established between a client and the
Barefoot Runtime server, a ``subscribe`` request must be issued by the
client in order for the runtime to act on commands issued by the client
as well as send and receive any other messages.
Args:
learn (bool, optional): Enable learn notifications. Provided through
digest messages from the gRPC stream channel. Default is
``True``.
timeout (bool, optional): Receive timeout notifications. Default is
``True``.
port_change (bool, optional): Receive port state change
notifications. Default is ``True``.
request_timeout (int, optional): Default is ``10``.
"""
subscribe = bfruntime_pb2.Subscribe()
subscribe.device_id = self.device_id
notifications = bfruntime_pb2.Subscribe.Notifications()
notifications.enable_learn_notifications = learn
notifications.enable_idletimeout_notifications = timeout
notifications.enable_port_status_change_notifications = port_change
subscribe.notifications.CopyFrom(notifications)
request = bfruntime_pb2.StreamMessageRequest()
request.client_id = self.client_id
request.subscribe.CopyFrom(subscribe)
return request
[docs] def create_write_request(
self,
program_name: str,
atomicity=WriteRequest.Atomicity.CONTINUE_ON_ERROR,
target: dict = {},
):
"""Creates a basic write request with no updates.
This creates the base gRPC object for the majority of runtime program
manipulation.
The optional target parameter contains any of the following:
* ``pipe_id`` : The id of the pipe to apply the write. Tofino devices
may contain multiple "pipes", which in turn are comprised of
multiple ports. Each pipe is capable of being loaded with it's own
P4 program that acts independently from other pipes. An ID is
therefore necessary to identify it.
* ``direction``: The Tofino Native Architecture has a concept of ingress
and egress pipelines. It is possible (AFAWK) that a single control
can be associated with both ingress and egress pipelines, but their
"instantiation" will contain tables unique to the direction.
* ``prsr_id``: TODO
The default argument for this is equivalent to:
.. code:: python
{
'pipe_id': 0xFFFF, # All pipes
'direction': 0xFF, # All directions
'prsr_id': 0xFF # TODO
}
Args:
program_name (str): The name of the program to target.
atomicity (WriteRequest.Atomicity): Controls the behaviour of a
write request with respect to batched updates, i.e., whether
any errors are ignored, errors cause rollbacks of previously
submitted writes in the batch, or whether the write is treated
as a single atomic transaction.
target (dict): An optionally provided dictionary that maps parser
identifier information.
Returns:
bfruntime_pb2.WriteRequest
"""
request = bfruntime_pb2.WriteRequest()
request.client_id = self.client_id
request.p4_name = program_name
request.atomicity = atomicity
device = bfruntime_pb2.TargetDevice()
device.device_id = self.device_id
device.pipe_id = target.get("pipe_id", 0xFFFF)
device.direction = target.get("direction", 0xFF)
device.prsr_id = target.get("prsr_id", 0xFF)
request.target.CopyFrom(device)
return request
def create_read_request(self, program_name: str):
request = bfruntime_pb2.ReadRequest()
request.client_id = self.client_id
request.p4_name = program_name
# request.atomicity = atomicity
target = bfruntime_pb2.TargetDevice()
target.device_id = self.device_id
target.pipe_id = 0xFFFF
target.direction = 0xFF
target.prsr_id = 0xFF
request.target.CopyFrom(target)
return request
def create_table_entry(self, table_name: str):
"""Generates empty table message
This produces the base gRPC object for a table entry for the supplied
table name.
Args:
table_name (str): Name of table to target
Returns:
bfruntime_pb2.TableEntry
"""
table_id = self.bfrt_info.get_table_id(table_name)
if table_id is None:
raise UnknownTable(table_name)
table_entry = bfruntime_pb2.TableEntry()
table_entry.table_id = table_id
return table_entry
[docs] def create_key_field(self, table_name: str, field_name: str, data: Field):
"""Generates key field component of a gRPC message
The performs a lookup of the key id (as messages contain a table ID and
not the name) using the :py:class:`BfRtInfo` object and also type checks
the match type.
Args:
table_name (str): Name of table to target
field_name (str): Name of specific key field
data (Field): Field data supplied as in instance of a `Field`
object.
Returns:
bfruntime_pb2.KeyField
Raises:
UnknownKeyField: If the :py:class:`BfRtInfo` object did not contain a
a valid key field for the given table and field names.
MismatchedMatchType: If the match types (e.g. :py:class:`Exact`) does not
match the match type defined for the field actually present in
the table.
"""
info_key_field = self.bfrt_info.get_key(table_name, field_name)
if info_key_field is None:
raise UnknownKeyField(table_name, field_name)
bfrt_key_field = bfruntime_pb2.KeyField()
bfrt_key_field.field_id = info_key_field.id
if info_key_field.match_type == "Exact":
if not isinstance(data, Exact):
raise MismatchedMatchType(field_name, data, "Exact")
bfrt_key_field.exact.value = data.value_bytes()
if info_key_field.match_type == "LongestPrefixMatch":
if not isinstance(data, LongestPrefixMatch):
raise MismatchedMatchType(field_name, data, "LongestPrefixMatch")
bfrt_key_field.lpm.value = data.value_bytes()
bfrt_key_field.lpm.prefix_len = data.prefix
if info_key_field.match_type == "Ternary":
if not isinstance(data, Ternary):
raise MismatchedMatchType(field_name, data, "Ternary")
bfrt_key_field.ternary.value = data.value_bytes()
bfrt_key_field.ternary.mask = data.mask_bytes()
return bfrt_key_field
def create_data_field(self, field, value):
data_field = bfruntime_pb2.DataField()
data_field.field_id = field.id
if isinstance(value, Field) or isinstance(value, bytes):
if field.type["type"] == "bytes":
if value.bitwidth != field.type["width"]:
raise MismatchedDataSize(field.type["width"], value.bitwidth)
elif field.type["type"] == "uint16":
if value.bitwidth != 16:
raise MismatchedDataSize(field.type["width"], value.bitwidth)
elif field.type["type"] == "uint32":
if value.bitwidth != 32:
raise MismatchedDataSize(field.type["width"], value.bitwidth)
data_field.stream = value.to_bytes()
elif isinstance(value, float):
data_field.float_val = value
elif isinstance(value, str):
if 'choices' in field.type:
choices = field.type['choices']
if value not in choices:
raise Exception(f'String value {value} not in choices: {choices}')
data_field.str_val = value
elif isinstance(value, bool):
data_field.bool_val = value
elif isinstance(value, list):
if len(value) > 0:
inner_value = value[0]
if isinstance(inner_value, int):
data_field.int_arr_val.val.extend(value)
elif isinstance(inner_value, bool):
data_field.bool_arr_value.val.extend(value)
else:
data = [self.create_data_field(x) for x in value]
data_field.container_arr_value.val.extend(data)
else:
raise Exception("Unknown data type!")
return data_field
def create_key_fields(self, table_name, key_fields):
"""Create the key fields gRPC message component"""
fields = []
for field_name, field_data in key_fields.items():
field = self.create_key_field(table_name, field_name, field_data)
fields.append(field)
return fields
def create_action(self, table_name, action_name, action_params):
info_action = self.bfrt_info.get_action_spec(table_name, action_name)
if info_action is None:
raise UnknownAction(table_name, action_name)
bfrt_table_data = bfruntime_pb2.TableData()
bfrt_table_data.action_id = info_action.id
if action_params is not None:
for param_name, param_data in action_params.items():
info_action_field = self.bfrt_info.get_action_field(
table_name, action_name, param_name
)
if info_action_field is None:
raise UnknownActionParameter(table_name, action_name, param_name)
try:
bfrt_data_field = self.create_data_field(
info_action_field, param_data
)
bfrt_table_data.fields.extend([bfrt_data_field])
except MismatchedDataSize as err:
raise InvalidActionParameter(
table_name, action_name, param_name, str(err)
)
return bfrt_table_data
[docs] def create_table_write(
self,
program_name,
table_name,
key,
action_name=None,
action_params=None,
update_type=Update.Type.INSERT, # Type not documented here because it
# absolutely destroys generated docs.
):
"""Create a match-action table write request
.. note::
This does not support batched writes.
Args:
program_name (str): Name of program to target.
table_name (str): Name of table within the program. Depending on how
the program has been compiled, you may need to prefix this name
with "pipe", e.g. "pipe.IngressControl.table".
key (dict): Dictionary of match field names to their match. The keys
should be strings, and the value should be an instance of a
:py:class:`Match` (e.g. :py:class:`Exact`) constructed with the correct
field type as defined in the program.
action_name (str): Name of the action to execute on a match.
action_params (dict): Dictionary of parameter names and their values
to pass to the executed action. As before, the key should be a
string, but the value should derive from :py:class:`Field`.
update_type (Update.Type): The type of operation to take place,
e.g., INSERT, MODIFY, DELETE. The default value of ``1``
corresponds to Update.Type.INSERT.
"""
bfrt_request = self.create_write_request(program_name)
bfrt_table_entry = self.create_table_entry(table_name)
bfrt_key_fields = self.create_key_fields(table_name, key)
bfrt_table_entry.key.fields.extend(bfrt_key_fields)
if action_name is not None:
bfrt_action = self.create_action(table_name, action_name, action_params)
bfrt_table_entry.data.CopyFrom(bfrt_action)
bfrt_update = bfrt_request.updates.add()
bfrt_update.type = update_type
bfrt_update.entity.table_entry.CopyFrom(bfrt_table_entry)
return bfrt_request
[docs] def create_table_data_write(
self,
program_name: str,
table_name,
key,
data,
update_type=bfruntime_pb2.Update.Type.INSERT,
):
"""Create a table write for arbitrary tables.
It is possible to write to many other tables exposed to the runtime
interface which can be used to manipulate copy to cpu settings,
multicast groups, port settings etc.
While these tables can be considered as "ordinary" database tables,
they are still manipulated by the same mechanisms as a match-action
update.
.. note::
This does not support batched writes.
Args:
program_name (str): Name of program to target.
table_name (str): Name of table within the program. Depending on how
the program has been compiled, you may need to prefix this name
with "pipe", e.g. "pipe.IngressControl.table".
key (dict): Dictionary of match field names to their match. The keys
should be strings, and the value should be an instance of a
:py:class:`Match` (e.g. :py:class:`Exact`) constructed with the correct
field type as defined in the program.
data (Field): The data to be added to the table. Like match-action
updates, this can be expressed handily with a :py:class:`Field`.
This may could involve a :py:class:`StringField`, however a use case
for this hasn't been firmly established.
update_type (Update.Type): The type of operation to take place,
e.g., INSERT, MODIFY, DELETE. The default value of ``1``
corresponds to Update.Type.INSERT.
"""
bfrt_request = self.create_write_request(program_name)
bfrt_table_entry = self.create_table_entry(table_name)
bfrt_key_fields = self.create_key_fields(table_name, key)
bfrt_table_entry.key.fields.extend(bfrt_key_fields)
bfrt_table_data = bfruntime_pb2.TableData()
for field_name, value in data.items():
field = self.bfrt_info.get_data_field(table_name, field_name)
bfrt_data_field = self.create_data_field(field.singleton, value)
bfrt_table_data.fields.extend([bfrt_data_field])
bfrt_table_entry.data.CopyFrom(bfrt_table_data)
bfrt_update = bfrt_request.updates.add()
bfrt_update.type = update_type
bfrt_update.entity.table_entry.CopyFrom(bfrt_table_entry)
return bfrt_request
[docs] def create_table_read(self, program_name, table_name, key):
bfrt_request = self.create_read_request(program_name)
bfrt_table_entry = self.create_table_entry(table_name)
bfrt_key_fields = self.create_key_fields(table_name, key)
bfrt_table_entry.key.fields.extend(bfrt_key_fields)
update = bfrt_request.entities.add()
update.table_entry.CopyFrom(bfrt_table_entry)
return bfrt_request
[docs] def create_copy_to_cpu(self, program_name, port):
"""Create a for copying data to the CPU
Warning:
Experimental.
"""
bfrt_request = self.create_write_request(program_name)
bfrt_table_entry = self.create_table_entry("$pre.port")
bfrt_key_field = self.create_key_field("$pre.port", "$DEV_PORT",
Exact(DevPort(port)))
bfrt_table_entry.key.fields.extend([bfrt_key_field])
info_cpu_port_field = self.bfrt_info.get_data_field(
"$pre.port", "$COPY_TO_CPU_PORT_ENABLE"
)
bfrt_cpu_port_field = self.create_data_field(
info_cpu_port_field.singleton, True
)
bfrt_table_entry.data.fields.extend([bfrt_cpu_port_field])
bfrt_update = bfrt_request.updates.add()
bfrt_update.type = bfruntime_pb2.Update.Type.MODIFY
bfrt_update.entity.table_entry.CopyFrom(bfrt_table_entry)
return bfrt_request
[docs] def create_set_pipeline_request(
self, program_name, bfrt_path, context_path, binary_path
):
# Need to figure out base_path properly later
request = bfruntime_pb2.SetForwardingPipelineConfigRequest()
request.client_id = self.client_id
request.device_id = self.device_id
request.base_path = 'install/share/tofinopd/'
request.action = SetPipelineReq.VERIFY_AND_WARM_INIT_BEGIN_AND_END
config = request.config.add()
config.p4_name = program_name
config.bfruntime_info = open(bfrt_path, "rb").read()
profile = config.profiles.add()
profile.profile_name = "pipe"
profile.context = open(context_path, "rb").read()
profile.binary = open(binary_path, "rb").read()
profile.pipe_scope.extend([0, 1, 2, 3])
return request
[docs] def create_get_pipeline_request(self):
request = bfruntime_pb2.GetForwardingPipelineConfigRequest()
request.device_id = self.device_id
request.client_id = self.client_id
return request
def make_bfrt_helper(host, device_id, client_id, bfrt_path):
""" Create a BfRtHelper instance with BfRtInfo already loaded. """
bfrt_data = json.loads(open(bfrt_path).read())
bfrt_info = BfRtInfo(bfrt_data)
return BfRtHelper(device_id, client_id, bfrt_info)
def make_empty_bfrt_helper(device_id, client_id):
""" Make an empty BfRtInfo object """
bfrt_info = BfRtInfo({})
return BfRtHelper(device_id, client_id, bfrt_info)
def make_merged_config(response):
""" Create a merged jSON configuration
This creates a merged jSON configuration from a BfRt response containing
both program and internal non-p4 data.
"""
non_p4_data = response.non_p4_config.bfruntime_info.decode('utf-8')
non_p4_config = json.loads(non_p4_data)
configs = []
for config in response.config:
p4_config = json.loads(config.bfruntime_info)
p4_config.get('tables').extend(non_p4_config.get('tables'))
configs.append(p4_config)
return configs
def make_port_map(program_name, bfrt_helper, client, ports):
""" Create a map of port to device port
For a given set of input ports, this will request over gRPC the device port
information.
"""
request = bfrt_helper.create_read_request(program_name)
for port in ports:
table_entry = bfrt_helper.create_table_entry('$PORT_STR_INFO')
key_field = bfrt_helper.create_key_fields(
'$PORT_STR_INFO',
key_fields={
'$PORT_NAME': Exact(StringField(port))
})
table_entry.key.fields.extend(key_field)
update = request.entities.add()
update.table_entry.CopyFrom(table_entry)
response = client.Read(request)
data = response.next()
result = {}
for entity in data.entities:
port_name = entity.table_entry.key.fields[0].exact.value.decode('utf-8')
dev_port = entity.table_entry.data.fields[0].stream
dev_port = int.from_bytes(dev_port, byteorder='big')
result[port_name] = dev_port
return result