Skip to content

loggly/muskrat

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

90 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Muskrat

A python producer-consumer library that provides a persistent multicasting message queue with a simple interface. Built ontop of S3 to provide persistnet message queueing that does not require installation of broker software. Muskrat also allows experimental support of using multiple brokers in a tee like fashion.

Currently the following brokers are supported:

s3
RabbitMQ (experimental)

Using multiple brokers allows Producers to 'tee' all messages. As an example, a message can be tee'd to write to both RabbitMQ for high-throughput and s3 for message persistence/replay.

###Message Structure

####Routing Keys

Routing keys allow the producer to specify where the message is to be stored in the broker. These routing keys can then be subscribed to by multiple consumers and messages can be proccessed independently by the consumer endpoints.

Routing Keys are strings separated by '.' ala:

    Frontend.Customer.Signup 
    Chatserver.General
    Job.Queue.Command

####Message Bodies

Message bodies are simply just strings. The producer library implements a JSON send as a convenience for sending objects.

Routing Key 

    Frontend.Customer.Signup

Message
    
    'Customer signed up!'


    '{
        "id":48239,
        "email":"test@loggly.com",
        "phone":"(555)555-5555",
        "subdomain":"logglytest",
        "company":"loggly",
        "username":"awesome_logger"
        "subscription": {
            "volume":"200"
            "retention":"7"
            "rate":"0.0"
        }
    }'

###Producers

Producers write messages to brokers to be distributed.

#####S3Producers

S3 Producers are blocking writers that upload the supplied message to S3 for the associated routing key. the following is an example of using an S3Producer.

from muskrat.producer import S3Producer

p = S3Producer( routing_key = 'Frontend.Customer.Signup' )
p.send( 'I am producing a message to s3!' )
p.send_json({
    'email':'test@loggly.com',
    'company':'loggly' 
})

#####ThreadedS3Producer

An asynchronous write interface to S3. These producers allows the creation of a thread pool of a specified size that to can issue simultaneous HTTP requests to S3. Used to speedup multiple contiguous writes as the producer will not block on the outbound network IO. Using this producer will eventually block until all messages have been processed regardless of the execution state of the main thread. This ensures that each message will be given at least one shot at being written.

Example:

from muskrat.producer import ThreadedS3Producer

p = ThreadedS3Producer( routing_key = 'ThreadTest.Messages', num_threads=100  )
for x in range( 1000 ):
    p.send_json( { 'message':x } )

#####RabbitMQ Producers (experimental)

Utilizes RabbitMQ as a message queueing/broker service. Does not guarantee indefinite message persistence or lifecycle polcies.

#####General Producers

General Producers can write to multiple brokers at one time. The general producer defaults to use only an S3Producer if no other Producer objects are supplied.

from muskrat.producer import Producer

p = Producer( routing_key='Chatserver.General' )
p.send( 'Welcome to General Chat!' )

###Consumers

#####S3 Consumers

Consumers receive messages from the brokers in chronological order and do work with them. They also allow automatic re-binding to a specific message queue across runs. The consumer must be instantiated for the corresponding message broker, ala:

p = Producer( routing_key='Simple.Message.Queue' )
p.send( 'This is a simple producer-consumer pair' )
from muskrat.s3consumer import Consumer

@Consumer( 'Simple.Message.Queue' )
def consume_messages( msg ):
    print msg

consume_messages.consumer.consume()

stdout >> This is a simple producer-consumer pair

Consumers are just functions that know what to do with the messages sent to a routing key. The above example will consume as many messages are on the queue when consume call was issued.

The @Consumer decorator is a convenience that allows the consumer object to be bound directly to the function. The function can still be called directly if desired (useful for testing).

Using the decorator is equivalent to:

from muskrat.s3consumer import S3Consumer

s3consumer = S3Consumer( 'Simple.Message.Queue', consume_messages )
s3consumer.consume()

#####S3 Cursor

S3 consumers need to track their own cursor. In order to do so they use a simple routing_key + timestamp of the message format. By default, the cursor is written to a file defined by __module__.consumer_function_name in the cursors folder of the muskrat package. This allows muskrat to pick up and and continue processing messages starting where it last stopped. Manipulating the cursor also allows for replay of messages or the ability to skip messages.

Cursors are married to their consumer functions for automatic re-binding. For the function, consume_messages run via the __main__ module the cursor would be stored in <path to muskrat install>/muskrat/cursors/__main__.consume_messages. The file would contain one line consisting of the current state of the consumer as it is proccessing messages. If the consume_message is subscribed to the routing key Chatserver.General then the cursor file contents would be something akin to CHATSERVER/GENERAL/2013-01-18T12:23:13.894895

###Config

Configuration settings are defined in a python file, python object, or dict. If the config is defined via a python file the module level variable CONFIG, which is mapped to the producer or consumer object upon creation, must be defined. By default, muskrat attempts to load config.py of muskrat/config.py.

import os

class Config(object):
    s3_timestamp_format = '%Y-%m-%dT%H:%M:%S.%f' #Timestamp format of cursor terminal file name.  See datetime.strftime for details.
    s3_key              = 'YOUR S3 KEY'
    s3_secret           = 'YOUR S3 SECRET'
    s3_bucket           = 'chatserver'           #S3 Bucket to produce messages to
    s3_cursor           = {
                            'type':'file',
                            'location':os.path.join( os.path.dirname(__file__), 'cursors' ) #Directory to store cursor files under
                        } 

    timeformat          = '%Y-%m-%dT%H:%M:%S'    #Timeformat for datetime objects in JSON messages

class DevConfig(object):
    s3_bucket           = 'chatserver_dev'

CONFIG = Config

If a configuration file external to a muskrat package is desired, the config parameter can be set to the full path of the external config.

#Where /home/hoover/muskrat_config.py is our external config file.

p = Producer( routing_key= 'Simple.Message.Queue', config='/home/hoover/muskrat_config.py' )

c = S3Consumer( 'Simple.Message.Queue', config='/home/hoover/muskrat_config.py' )

@Consumer( 'Simple.Message.Queue', config='/home/hoover/muskrat_config.py' )
def simple_consume( msg ):
    print msg

###TESTING

From the root of this project, run:

$ python -m muskrat.tests.test_s3consumer
$ python -m muskrat.tests.test_producer

###TODO