Skip to content

robbrit/tx-easy-pika

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Description

A wrapper around pika's Twisted connection to make it simpler to use.

Main assumptions:

  • All messages are JSON. Non-JSON messages log an error and are acked immediately. You can publish non-JSON messages, any non-strings that are published are converted to JSON strings.
  • All you really want to do is consume and publish.

Installation

Use pip:

pip install tx-easy-pika

Usage

Example

from datetime import datetime
from txeasypika import QueueConnection
from twisted.internet import reactor


def callback(channel, tag, message):
    ''' This function will be called whenever the server sends us a message. '''
    print "Received a message:"
    print message
    channel.basic_ack(tag)

queues = QueueConnection()

# Create an exchange
queues.exchange_declare(exchange="test-exchange")

# Set up a listener on a queue
queues.bind("my-queue", "test-exchange", "test-routing-key", callback)

# Now publish some messages to that queue
# Note that this message will not be published until we have started the
# Twisted reactor and we have successfully connected to Rabbit.
for i in range(5):
    queues.basic_publish("test-exchange", "test-routing-key", {
        "value": i,
        "message_data": "This is a message",
        "nested_field": {
            "an_array": ["this has an array"],
            "a_date": datetime.now()
        }
    })

# Start up the reactor
reactor.run()

QueueConnection

Class to represent a connection to the queues.

init()

Construct a new connection to the queues. Note that this is based on Twisted, so no connection is actually made until the reactor is running.

Arguments:

  • host (str) - The server that RabbitMQ is on.
  • port (str) - The port that RabbitMQ is listening on.
  • username (str) - Username for RabbitMQ.
  • password (str) - Password for RabbitMQ.
  • heartbeat (int) - Heartbeat frequency in seconds for RabbitMQ.
  • prefetch_count (int) - How many messages to prefetch for this connection.
  • log_level (logging log level) - Logging level for Pika

basic_publish()

Publish a message to the queues. This is a deferred method, the actual call will be made when the system has successfully connected to the AMQP server.

Arguments:

  • exchange (str or unicode sequence of these characters: letters, digits, hyphen, underscore, period, or colon.) - The exchange to publish to.
  • routing_key (str) - The routing key to bind on.
  • message (anything) - The message to send. If this is not a string it will be serialized as JSON, and the properties.content_type field will be forced to be application/json.
  • properties (dict) - Dict of AMQP message properties.
  • mandatory (bool) - AMQP Mandatory flag.
  • immediate (bool) - AMQP Immediate flag.

exchange_declare()

Declare an exchange. This is a deferred method, the actual call will be made when the system has successfully connected to the AMQP server.

If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

Parameters:

  • callback (method) – Call this method on Exchange.DeclareOk
  • exchange (str or unicode sequence of these characters: letters, digits, hyphen, underscore, period, or colon.) – The exchange name consists of a non-empty
  • exchange_type (str) – The exchange type to use
  • passive (bool) – Perform a declare or just check to see if it exists
  • durable (bool) – Survive a reboot of RabbitMQ
  • auto_delete (bool) – Remove when no more queues are bound to it
  • internal (bool) – Can only be published to by other exchanges
  • nowait (bool) – Do not expect an Exchange.DeclareOk response
  • arguments (dict) – Custom key/value pair arguments for the exchange

bind()

Declare a queue, create a binding on it. Optionally attach a function to call. Note that the function will be called once for each call to bind()

  • queue_name (str) - The name of the queue we want to declare.
  • exchange (str) - The exchange of the initial binding on the new queue.
  • routing_key (str) - The routing key of the initial binding on the new queue.
  • callback (function) - A function to call when a message is received on this queue. Arguments are (channel object, delivery tag, JSON-parsed message).
  • arguments (dict) - AMQP Arguments for declaring the queue.
  • no_ack (bool) - Whether this consumer will ack or not. Ignored if callback is None.

close()

Close the connection to the AMQP server.

About

Nice and simple library for using AMQP with Twisted.

Resources

Stars

Watchers

Forks

Packages

No packages published