Skip to content

PythonInteractingWithTheTransport

James Edmondson edited this page Sep 9, 2018 · 12 revisions

Python Guide Series
Architecture | Knowledge Base | Networking | Containers


MADARA Transport Layer

The main entry point into the MADARA transport layer is the QoSTransportSettings class. The QoSTransportSettings class contains dozens of network-specific settings that govern basic configuration, quality-of-service, and filtering.


Table of Contents


Understanding the QoS Transport Settings class


1. Creating Networked Transports

Creating a networked transport in MADARA generally requires editing QoSTransportSettings.type (which specifies the network transport type) and a list of hosts (for UDP, Multicast, Broadcast) or domains (map to topics in DDS transports).

UDP requires a hosts vector with the ip:port of the sender first (where we want to bind and how we want to be known to others in the originator field of messages) and all peers in the rest of the vector. Multicast requires a valid multicast ip and port in the first entry of the hosts vector. Similarly, broadcast requires a valid broadcast ip (for the subnet of your machine) and port in the first entry of the hosts vector.

Example of UDP transport constructor example

# import the submodules and give them shorter aliases
import madara.knowledge as engine;
import madara.transport as transport;

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("127.0.0.1:45000")
settings.hosts.append("127.0.0.1:45001")
settings.type = transport.TransportTypes.UDP

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)

# set our id to 0 and let the other agent know that we are ready
knowledge.set(".id", 0)
knowledge.set("agent{.id}.ready", 1)

Example of multicast transport constructor (Multicast IP range)

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)

# set our id to 0 and let the other agent know that we are ready
knowledge.set(".id", 0)
knowledge.set("agent{.id}.ready", 1)

Example of broadcast transport constructor

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("192.168.1.255:15000")
settings.type = transport.TransportTypes.BROADCAST

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)

# set our id to 0 and let the other agent know that we are ready
knowledge.set(".id", 0)
knowledge.set("agent{.id}.ready", 1)

2. Rebroadcasting

When using real robots in a wide area, it may not be possible to reach all neighbors in a single message send. To aid developers trying to solve problems where multi-hops are required for message routing, MADARA supports rebroadcast TTLs set on the sender side. MADARA also supports opting out of rebroadcasts. In fact, all transports opt out by default.

Example routing between agent1 and agent3 that is not possible without rebroadcasts Routing packets with Rebroadcast TTL

Example of sending a packet with a rebroadcast ttl of 3

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()

# set rebroadcast ttl to 3.
settings.set_rebroadcast_ttl(3)

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)

Example of enabling participation in rebroadcasts

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()

# enable participation in rebroadcasts up to 255 ttl
settings.enable_participant_ttl (255)

# Our own packets will have a ttl of 2
settings.set_rebroadcast_ttl (2)

knowledge = engine.KnowledgeBase("", settings)

3. Basic bandwidth and deadline enforcement

The MADARA architecture monitors two different types of bandwidth usage: sending and receiving bandwidth. Setting a limit for either of these will serve as a hard limit that does not differentiate between priorities of information. Once a packet is sent, the bandwidth counters are updated, and if the bytes per second rate is higher than the limit you set, all packets will be dropped until the rate dips below the limit. These limiters do not currently look at the size of the packet that is being sent out, so if you are at 99,500 B/s and your limit is 100,000 B/s, it will try to send any next packet (even if it 1MB), update the bandwidth counter, and then not send another packet until you reach 99,500 B/s sent over the past 10s.

For a more flexible bandwidth option that you can configure, see Bandwidth Filters.

Deadline enforcement is concerned with enforcing latency deadlines between reasoning entitites on the network. Deadline enforcement requires some type of time synchronization protocol between agents in the network, preferably accurate to within a second, for it to be useful to the MADARA entities.


3.a. Bandwidth enforcement

Enforcing send bandwidth limits

This type of enforcement is useful if you want to make sure no agent is using more than a certain limit (e.g., 100KB/s) individually.

Example of enforcing a send bandwidth limit

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST

# set the send bandwidth limit to 100,000 bytes per second.
settings.set_send_bandwidth_limit(100000)

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)

Enforcing send bandwidth limits based on received bandwidth

This type of enforcement is useful if you want to make sure the agent is not violating a collective bandwidth limit (e.g., 2MB/s) and is based on the amount of data received per second over the past 10s.

Example of enforcing a total bandwidth limit

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST

# set the total bandwidth limit to 100,000 bytes per second.
settings.set_total_bandwidth_limit(100000)

Note that even though this is keying off received bandwidth, it affects messages being sent (i.e., enforcement is applied when the agent attempts to send or rebroadcast knowledge to the network).


3.b. Deadline enforcement

Deadline enforcement aims to discard received packets that are too old to be useful to agent state reasoning. Handling old packets can significantly impede performance in an agent-saturated network, so clearing your queues quickly can aid the agent network.

Example of enforcing a transport latency deadline

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST

# set the total bandwidth limit to 100,000 bytes per second.
settings.set_deadline(10)

4. Filters

User callbacks can also be inserted into the transport layer to modify payloads. The MADARA transport system allows users to insert callbacks into three key operations: receive, send, and rebroadcast. Filter callbacks are given a number of arguments that are relevant to the filtering operation. As of version 1.1.9, these arguments include the following:

  • args[0]: The knowledge record that the filter is acting upon
  • args[1]: The name of the knowledge record, if applicable ("" if unnamed, but this should never happen)
  • args[2]: The type of operation calling the filter (integer valued). Valid types are: madara.transport.TransportContext.OperationTypes.IDLE_OPERATION (should never see), madara.transport.TransportContext.OperationTypes.SENDING_OPERATION (transport is trying to send the record), madara.transport.TransportContext.OperationTypes.RECEIVING_OPERATION (transport has received the record and is ready to apply the update), madara.transport.TransportContext.OperationTypes.REBROADCASTING_OPERATION (transport is trying to rebroadcast the record -- only happens if rebroadcast is enabled in Transport Settings)
  • args[3]: Bandwidth used while sending through this transport, measured in bytes per second.
  • args[4]: Bandwidth used while receiving from this transport, measured in bytes per second.
  • args[5]: Message timestamp (when the message was originally sent, in seconds)
  • args[6]: Current timestamp (the result of time (NULL))
  • args[7]: Knowledge Domain (partition of the knowledge updates)
  • args[8]: Originator (Source ID of the sender)

The filter can add data to the payload by pushing a variable name (string) followed by a value, which can be a double, string, integer, byte array, or array of integers or doubles, just as you would do with a set operation. This can be useful if other reasoners in the network are expecting additional meta data for the update (which they are free to strip out or ignore in a receive filter, if they don't need the information).

The filter can also access any variable in the madara.knowledge.KnowledgeBase through the madara.knowledge.Variables facade. With the arguments and variables interfaces, developers can respond to transport events in highly dynamic and extensible ways.


4.a. Bandwidth Filters

To create a bandwidth filter, we simply need to create a filter that looks at argument index 3 (how much bandwidth we have used sending) or argument index 4 (how much bandwidth is being used--i.e., what we have received in bytes per second), depending on which type of bandwidth enforcement we want to do.

In the following example, we drop any packet members if we are over 100,000 bytes per second sending within the past 10s.

Example of enforcing a send bandwidth limit via filtering

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Do not allow more than 100k bytes per second
def enforce_send_limit(args, variables):
 * by default, we return an empty record, which means remove it
  result = madara.KnowledgeRecord

  if len(args) > 0:
   * only modify the return value with arg[0] if we are under 100KB/s
    if args[3].to_integer () < 100000:
      result = args[0]

  return result

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TranportTypes.MULTICAST

# add the above filter for all file types, applied before sending
settings.add_send_filter (madara.KnowledgeRecord.ValueTypes.ALL_TYPES,
                          enforce_send_limit)
settings.add_rebroadcast_filter (madara.KnowledgeRecord.ValueTypes.ALL_TYPES,
                          enforce_send_limit)

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)

4.b. Deadline Filters

Filters can inspect argument indices 5 and 6 for information on the sent and received time for each packet. The difference between these two arguments is called the packet latency, and this latency value can inform the filter of deadline violations.

Example of enforcing a network latency deadline via filtering

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

def filter_deadlines(args, variables):
 * by default, we return an empty record, which means remove it
  result = engine.KnowledgeRecord

  if len(args) >= 7):
   * args[5] is sent time, args[6] is current time
   * keep any packet with a latency of less than 5 seconds
    if args[6].to_integer () - args[5].to_integer () < 5:
      result = args[0]

  return result

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST

# add the above filter for all file types, applied before sending
settings.add_send_filter (engine.KnowledgeRecord.ValueTypes.ALL_TYPES,
                          filter_deadlines)

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)

4.c. Payload Shaping Filters

Filters that drop packets can be very useful, but developers also have unlimited options for inflating or reducing packets and updates within the packets. Packet shaping is any operation that mutates a packet element into a different form. This can be operations like converting packets into XML, resizing an image payload, or even encrypting part of a packet with a private key.

In the following example, we simply encapsulate any string payload with the xml elements <item> and </item>.

Example of shaping a payload before it gets sent out

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

def add_item_tag(args, variables):
 * by default, we return an empty record, which means remove it
  result = madara.KnowledgeRecord

 * if we have an arg and it is a string value
  if len(args) > 0 && args[0].is_string_type ():
   * encase it in an <item> tag
    encased = "<item>"
    encased += args[0].to_string ()
    encased += "</item>"

    result.set_value (encased)

  return result

# Create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TranportTypes.MULTICAST

# add the above filter to string types
settings.add_send_filter(madara.Knowledge_Record.ValueTypes.STRING, add_item_tag)

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("", settings)

# each of these will be encased in an item tag after filtering
knowledge.set("name", "John Smith")
knowledge.set("occupation", "Banker")

# each of these will not be encased because they are not strings
knowledge.set("age", Madara::Knowledge_Record::Integer (43))
knowledge.set("money", 553200.50)

5. Trusted and Banned Peer Lists

MADARA transports can be configured to trust or ban lists of peers. The banned list can be useful for mitigating known faulty sensors that are flooding the network as well as enforcing basic security. The accept list is more useful for security policies and is significantly more stringent. Once a node is added to the trusted list, anything not on the trusted list will be automatically denied any update abilities to the local context.

Example of adding peers to trusted list

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# only trust the agents "agent2" and "agent3"
settings = transport.QoSTransportSettings()
settings.add_trusted_peer("agent2")
settings.add_trusted_peer("agent3")

# others could add "agent1" to their trusted list
knowledge = engine.KnowledgeBase("agent1", settings)

The above code has agent1 set his unique identifier to "agent1". Let's show how we could make a peer that doesn't trust knowledge from the above entity.

Example of adding peers to banned list

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

# Do not trust "agent1"
settings = transport.QoSTransportSettings()
settings.add_banned_peer ("agent1")

# others could add "agent1" to their trusted list
knowledge = engine.KnowledgeBase("agent2", settings)

6. Simulating Packet Loss

Before deploying MADARA applications into real-world, real-time situations (especially wireless situations), developers should probably test that their applications work despite packet loss. MADARA provides first class support for dropping packets at deterministic and random intervals.


6.a. Deterministic Packet Loss

The deterministic policy is enforced via a stride scheduler within the Packet Scheduler.

Example of setting a deterministic non-bursty packet drop policy

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

settings = transport.QoSTransportSettings()
# Set a drop rate of 20% in a deterministic manner (1 drop, then 4 successful sends)
settings.update_drop_rate(.2, transport.DropTypes.PACKET_DROP_DETERMINISTIC)

knowledge = engine.KnowledgeBase("", settings)

Burst usage

When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns. Burst rates can cause unusual patterns within the deterministic policy settings that may cause a higher rate than intended.

Example of setting a deterministic bursty packet drop policy

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

settings = transport.QoSTransportSettings()
# Set a drop rate of 20% in a deterministic manner with 2 successive drops (burst rate = 2)
settings.update_drop_rate(.2, transport.DropTypes.PACKET_DROP_DETERMINISTIC, 2)

knowledge = engine.KnowledgeBase("", settings)

6.b. Probablistic Packet Loss

The probablistic policy enforces a uniform distribution of drops at a target rate.

Normal usage

The following example shows how to set a 20% drop rate in a probablistic manner without scheduled bursts.

Example of setting a deterministic non-bursty packet drop policy

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

settings = transport.QoSTransportSettings()

# Set a drop rate of 20% in a probablistic manner
settings.update_drop_rate(.2, transport.DropTypes.PACKET_PROBABLISTIC)

knowledge = engine.KnowledgeBase("", settings)

Burst usage

When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns.

Example of setting a deterministic bursty packet drop policy

# import the submodules and give them shorter aliases
import madara.knowledge as engine
import madara.transport as transport

settings = transport.QoSTransportSettings()

# Set a drop rate of 20% in a deterministic manner with 2 successive drops (burst rate = 2)
settings.update_drop_rate(.2, transport.DropTypes.PACKET_DROP_PROBABLISTIC, 2)

knowledge = engine.KnowledgeBase("", settings)

Understanding When Knowledge is Sent Over the Network

Mutated knowledge is aggregated and sent over the network transports whenever you call KnowledgeBase.set, KnowledgeBase.evaluate, KnowledgeBase.wait, or KnowledgeBase.send_modifieds. This can be a bit non-intuitive for large application developers who just want many variables and then send everything. You can delay sending modifieds by setting the delay_sending_modifieds member to true when providing the madara.knowledge.EvalSettings or madara.knowledge.EvalSettings classes to the set, evaluate, and wait functions on the madara.knowledge.KnowledgeBase.

The preferred way to aggregate knowledge in a larger application, especially, is to use Knowledge KnowledgeContainers. Containers are object-oriented abstractions that point to a specific variable inside of the KnowledgeBase. Most of these Containers are extremely fast, O(1) lookups and they also suppress sends over the network during set-like operations. Basically, using Containers, you can quickly set/get values and build knowledge aggregations without sending over the network. When you're ready to send, you call KnowledgeBase.send_modifieds. See the Containers wiki for more information and examples.

More Information

The Python module and all submodules, classes, and functions are documented to work with the Python help system. To start browsing the inline help, try the following inside of a python interpreter:

import madara
help (madara)
help (madara.knowledge)

If you are looking for C++ code examples and guides to supplement Python knowledge, your best bet would be to start with the tutorials (located in the tutorials directory of the MADARA root directory--see the README.txt file for descriptions). After that, there are many dozens of tests that showcase and profile the many functions, classes, and functionalities available to MADARA users.

Users may also browse the Library Documentation for all MADARA functions, classes, etc. and the Wiki pages on this website.


Python Guide Series
Architecture | Knowledge Base | Networking | Containers