PythonInteractingWithTheTransport
Python Guide Series
Architecture | Knowledge Base | Networking | Containers
The main entry point into the MADARA transport layer is the QoSTransportSettings class. The QoSTransportSettings class contains dozens of network-specific settings that govern basic configuration, quality-of-service, and filtering.
- MADARA Transport Layer
- Table of Contents
- Understanding the QoS Transport Settings class
- More Information
Creating a networked transport in MADARA generally requires editing QoSTransportSettings.type
(which specifies the network transport type) and a list of hosts (for UDP, Multicast, Broadcast) or domains (map to topics in DDS transports).
UDP requires a hosts vector with the ip:port of the sender first (where we want to bind and how we want to be known to others in the originator field of messages) and all peers in the rest of the vector. Multicast requires a valid multicast ip and port in the first entry of the hosts vector. Similarly, broadcast requires a valid broadcast ip (for the subnet of your machine) and port in the first entry of the hosts vector.
Example of UDP transport constructor example
# import the submodules and give them shorter aliases
import madara.knowledge as engine;
import madara.transport as transport;
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("127.0.0.1:45000")
settings.hosts.append("127.0.0.1:45001")
settings.type = transport.TransportTypes.UDP
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)
# set our id to 0 and let the other agent know that we are ready
knowledge.set(".id", 0)
knowledge.set("agent{.id}.ready", 1)
Example of multicast transport constructor (Multicast IP range)
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)
# set our id to 0 and let the other agent know that we are ready
knowledge.set(".id", 0)
knowledge.set("agent{.id}.ready", 1)
Example of broadcast transport constructor
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("192.168.1.255:15000")
settings.type = transport.TransportTypes.BROADCAST
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)
# set our id to 0 and let the other agent know that we are ready
knowledge.set(".id", 0)
knowledge.set("agent{.id}.ready", 1)
When using real robots in a wide area, it may not be possible to reach all neighbors in a single message send. To aid developers trying to solve problems where multi-hops are required for message routing, MADARA supports rebroadcast TTLs set on the sender side. MADARA also supports opting out of rebroadcasts. In fact, all transports opt out by default.
Example routing between agent1 and agent3 that is not possible without rebroadcasts
Example of sending a packet with a rebroadcast ttl of 3
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
# set rebroadcast ttl to 3.
settings.set_rebroadcast_ttl(3)
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)
Example of enabling participation in rebroadcasts
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
# enable participation in rebroadcasts up to 255 ttl
settings.enable_participant_ttl (255)
# Our own packets will have a ttl of 2
settings.set_rebroadcast_ttl (2)
knowledge = engine.KnowledgeBase("", settings)
The MADARA architecture monitors two different types of bandwidth usage: sending and receiving bandwidth. Setting a limit for either of these will serve as a hard limit that does not differentiate between priorities of information. Once a packet is sent, the bandwidth counters are updated, and if the bytes per second rate is higher than the limit you set, all packets will be dropped until the rate dips below the limit. These limiters do not currently look at the size of the packet that is being sent out, so if you are at 99,500 B/s and your limit is 100,000 B/s, it will try to send any next packet (even if it 1MB), update the bandwidth counter, and then not send another packet until you reach 99,500 B/s sent over the past 10s.
For a more flexible bandwidth option that you can configure, see Bandwidth Filters.
Deadline enforcement is concerned with enforcing latency deadlines between reasoning entitites on the network. Deadline enforcement requires some type of time synchronization protocol between agents in the network, preferably accurate to within a second, for it to be useful to the MADARA entities.
Enforcing send bandwidth limits
This type of enforcement is useful if you want to make sure no agent is using more than a certain limit (e.g., 100KB/s) individually.
Example of enforcing a send bandwidth limit
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST
# set the send bandwidth limit to 100,000 bytes per second.
settings.set_send_bandwidth_limit(100000)
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)
Enforcing send bandwidth limits based on received bandwidth
This type of enforcement is useful if you want to make sure the agent is not violating a collective bandwidth limit (e.g., 2MB/s) and is based on the amount of data received per second over the past 10s.
Example of enforcing a total bandwidth limit
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST
# set the total bandwidth limit to 100,000 bytes per second.
settings.set_total_bandwidth_limit(100000)
Note that even though this is keying off received bandwidth, it affects messages being sent (i.e., enforcement is applied when the agent attempts to send or rebroadcast knowledge to the network).
Deadline enforcement aims to discard received packets that are too old to be useful to agent state reasoning. Handling old packets can significantly impede performance in an agent-saturated network, so clearing your queues quickly can aid the agent network.
Example of enforcing a transport latency deadline
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST
# set the total bandwidth limit to 100,000 bytes per second.
settings.set_deadline(10)
User callbacks can also be inserted into the transport layer to modify payloads. The MADARA transport system allows users to insert callbacks into three key operations: receive, send, and rebroadcast. Filter callbacks are given a number of arguments that are relevant to the filtering operation. As of version 1.1.9, these arguments include the following:
- args
[0]
: The knowledge record that the filter is acting upon - args
[1]
: The name of the knowledge record, if applicable ("" if unnamed, but this should never happen) - args
[2]
: The type of operation calling the filter (integer valued). Valid types are:madara.transport.TransportContext.OperationTypes.IDLE_OPERATION
(should never see),madara.transport.TransportContext.OperationTypes.SENDING_OPERATION
(transport is trying to send the record),madara.transport.TransportContext.OperationTypes.RECEIVING_OPERATION
(transport has received the record and is ready to apply the update),madara.transport.TransportContext.OperationTypes.REBROADCASTING_OPERATION
(transport is trying to rebroadcast the record -- only happens if rebroadcast is enabled in Transport Settings) - args
[3]
: Bandwidth used while sending through this transport, measured in bytes per second. - args
[4]
: Bandwidth used while receiving from this transport, measured in bytes per second. - args
[5]
: Message timestamp (when the message was originally sent, in seconds) - args
[6]
: Current timestamp (the result of time (NULL)) - args
[7]
: Knowledge Domain (partition of the knowledge updates) - args
[8]
: Originator (Source ID of the sender)
The filter can add data to the payload by pushing a variable name (string) followed by a value, which can be a double, string, integer, byte array, or array of integers or doubles, just as you would do with a set operation. This can be useful if other reasoners in the network are expecting additional meta data for the update (which they are free to strip out or ignore in a receive filter, if they don't need the information).
The filter can also access any variable in the madara.knowledge.KnowledgeBase through the madara.knowledge.Variables facade. With the arguments and variables interfaces, developers can respond to transport events in highly dynamic and extensible ways.
To create a bandwidth filter, we simply need to create a filter that looks at argument index 3 (how much bandwidth we have used sending) or argument index 4 (how much bandwidth is being used--i.e., what we have received in bytes per second), depending on which type of bandwidth enforcement we want to do.
In the following example, we drop any packet members if we are over 100,000 bytes per second sending within the past 10s.
Example of enforcing a send bandwidth limit via filtering
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Do not allow more than 100k bytes per second
def enforce_send_limit(args, variables):
* by default, we return an empty record, which means remove it
result = madara.KnowledgeRecord
if len(args) > 0:
* only modify the return value with arg[0] if we are under 100KB/s
if args[3].to_integer () < 100000:
result = args[0]
return result
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TranportTypes.MULTICAST
# add the above filter for all file types, applied before sending
settings.add_send_filter (madara.KnowledgeRecord.ValueTypes.ALL_TYPES,
enforce_send_limit)
settings.add_rebroadcast_filter (madara.KnowledgeRecord.ValueTypes.ALL_TYPES,
enforce_send_limit)
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)
Filters can inspect argument indices 5 and 6 for information on the sent and received time for each packet. The difference between these two arguments is called the packet latency, and this latency value can inform the filter of deadline violations.
Example of enforcing a network latency deadline via filtering
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
def filter_deadlines(args, variables):
* by default, we return an empty record, which means remove it
result = engine.KnowledgeRecord
if len(args) >= 7):
* args[5] is sent time, args[6] is current time
* keep any packet with a latency of less than 5 seconds
if args[6].to_integer () - args[5].to_integer () < 5:
result = args[0]
return result
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST
# add the above filter for all file types, applied before sending
settings.add_send_filter (engine.KnowledgeRecord.ValueTypes.ALL_TYPES,
filter_deadlines)
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)
Filters that drop packets can be very useful, but developers also have unlimited options for inflating or reducing packets and updates within the packets. Packet shaping is any operation that mutates a packet element into a different form. This can be operations like converting packets into XML, resizing an image payload, or even encrypting part of a packet with a private key.
In the following example, we simply encapsulate any string payload with the xml elements <item>
and </item>
.
Example of shaping a payload before it gets sent out
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
def add_item_tag(args, variables):
* by default, we return an empty record, which means remove it
result = madara.KnowledgeRecord
* if we have an arg and it is a string value
if len(args) > 0 && args[0].is_string_type ():
* encase it in an <item> tag
encased = "<item>"
encased += args[0].to_string ()
encased += "</item>"
result.set_value (encased)
return result
# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TranportTypes.MULTICAST
# add the above filter to string types
settings.add_send_filter(madara.Knowledge_Record.ValueTypes.STRING, add_item_tag)
# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)
# each of these will be encased in an item tag after filtering
knowledge.set("name", "John Smith")
knowledge.set("occupation", "Banker")
# each of these will not be encased because they are not strings
knowledge.set("age", Madara::Knowledge_Record::Integer (43))
knowledge.set("money", 553200.50)
MADARA transports can be configured to trust or ban lists of peers. The banned list can be useful for mitigating known faulty sensors that are flooding the network as well as enforcing basic security. The accept list is more useful for security policies and is significantly more stringent. Once a node is added to the trusted list, anything not on the trusted list will be automatically denied any update abilities to the local context.
Example of adding peers to trusted list
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# only trust the agents "agent2" and "agent3"
settings = transport.QoSTransportSettings()
settings.add_trusted_peer("agent2")
settings.add_trusted_peer("agent3")
# others could add "agent1" to their trusted list
knowledge = engine.KnowledgeBase("agent1", settings)
The above code has agent1 set his unique identifier to "agent1". Let's show how we could make a peer that doesn't trust knowledge from the above entity.
Example of adding peers to banned list
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
# Do not trust "agent1"
settings = transport.QoSTransportSettings()
settings.add_banned_peer ("agent1")
# others could add "agent1" to their trusted list
knowledge = engine.KnowledgeBase("agent2", settings)
Before deploying MADARA applications into real-world, real-time situations (especially wireless situations), developers should probably test that their applications work despite packet loss. MADARA provides first class support for dropping packets at deterministic and random intervals.
The deterministic policy is enforced via a stride scheduler within the Packet Scheduler.
Example of setting a deterministic non-bursty packet drop policy
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
settings = transport.QoSTransportSettings()
# Set a drop rate of 20% in a deterministic manner (1 drop, then 4 successful sends)
settings.update_drop_rate(.2, transport.DropTypes.PACKET_DROP_DETERMINISTIC)
knowledge = engine.KnowledgeBase("", settings)
Burst usage
When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns. Burst rates can cause unusual patterns within the deterministic policy settings that may cause a higher rate than intended.
Example of setting a deterministic bursty packet drop policy
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
settings = transport.QoSTransportSettings()
# Set a drop rate of 20% in a deterministic manner with 2 successive drops (burst rate = 2)
settings.update_drop_rate(.2, transport.DropTypes.PACKET_DROP_DETERMINISTIC, 2)
knowledge = engine.KnowledgeBase("", settings)
The probablistic policy enforces a uniform distribution of drops at a target rate.
Normal usage
The following example shows how to set a 20% drop rate in a probablistic manner without scheduled bursts.
Example of setting a deterministic non-bursty packet drop policy
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
settings = transport.QoSTransportSettings()
# Set a drop rate of 20% in a probablistic manner
settings.update_drop_rate(.2, transport.DropTypes.PACKET_PROBABLISTIC)
knowledge = engine.KnowledgeBase("", settings)
Burst usage
When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns.
Example of setting a deterministic bursty packet drop policy
# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport
settings = transport.QoSTransportSettings()
# Set a drop rate of 20% in a deterministic manner with 2 successive drops (burst rate = 2)
settings.update_drop_rate(.2, transport.DropTypes.PACKET_DROP_PROBABLISTIC, 2)
knowledge = engine.KnowledgeBase("", settings)
Mutated knowledge is aggregated and sent over the network transports whenever you call KnowledgeBase.set
, KnowledgeBase.evaluate
, KnowledgeBase.wait
, or KnowledgeBase.send_modifieds
. This can be a bit non-intuitive for large application developers who just want many variables and then send everything. You can delay sending modifieds by setting the delay_sending_modifieds
member to true
when providing the madara.knowledge.EvalSettings
or madara.knowledge.EvalSettings
classes to the set
, evaluate
, and wait
functions on the madara.knowledge.KnowledgeBase
.
The preferred way to aggregate knowledge in a larger application, especially, is to use Knowledge KnowledgeContainers. Containers are object-oriented abstractions that point to a specific variable inside of the KnowledgeBase. Most of these Containers are extremely fast, O(1) lookups and they also suppress sends over the network during set
-like operations. Basically, using Containers, you can quickly set/get values and build knowledge aggregations without sending over the network. When you're ready to send, you call KnowledgeBase.send_modifieds
. See the Containers wiki for more information and examples.
The Python module and all submodules, classes, and functions are documented to work with the Python help system. To start browsing the inline help, try the following inside of a python interpreter:
import madara
help (madara)
help (madara.knowledge)
If you are looking for C++ code examples and guides to supplement Python knowledge, your best bet would be to start with the tutorials (located in the tutorials directory of the MADARA root directory--see the README.txt file for descriptions). After that, there are many dozens of tests that showcase and profile the many functions, classes, and functionalities available to MADARA users.
Users may also browse the Library Documentation for all MADARA functions, classes, etc. and the Wiki pages on this website.
Python Guide Series
Architecture | Knowledge Base | Networking | Containers