Skip to content

akellehe/phonon

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

About

When your users are sending 1000s, or even 10s of 1000s of events per second, it becomes hard to keep up with realtime user behavior.

Aggregating writes, and writing them out in a smart way allows the most efficient batching possible.

With phonon, you can join events across a cluster of worker/consumer nodes by totally abstracting away reference counting.

You can decide to collect events and aggregate across your cluster, and then write to a data backend at the time the user's session ends. You can also decide to write out based on how many events have been aggregated up to that point, for the user.

This allows your ingestion pipeline to scale to 10s of 1000s of client-facing events per second with a single redis backend. Oh, and phonon provides sharding with linear scaling.

Installation

Simplest way to get the project is through the Python Package Index

pip install phonon

You can install this package pretty easily with setup.py

python setup.py install

Or you can use git+ssh to get the bleeding edge:

pip install git+ssh://git@github.com/buzzfeed/phonon.git 

But you can also use pip if you clone...

git clone git@github.com:buzzfeed/phonon.git ./phonon; cd phonon; pip install .

Run the tests

This package uses the standard setup.py approach:

python setup.py test

You should probably run that in a virtualenv. People should use virtualenvs.

Getting Started

This latest version of phonon encourages a lock-free, asynchronous approach to aggregation through your redis cache. With this in mind; we support but do not encourage locking. With that said...

References

The building blocks for this approach to concurrency is the Reference object. You can use Reference s for

  • locking on resources for exclusive reads or writes
  • finding out how many processes are using a resource at a given time
  • keeping track of how many processes have modified that resource
  • executing a callback when a process is the last to finish using a resource

An example

Let's say we have a process that monitors events on a stream in NSQ, a popular message bus. Sometimes these can be VERY high volume!

If we want to aggregate locally, before writing to a cache, and ultimately a database; phonon makes that process easy.

import nsq

import phonon.registry
import phonon.connections
import phonon.model
import phonon.field

class Session(phonon.model.Model):
    id = phonon.field.ID()
    impressions = phonon.field.SumField()
    clicks = phonon.field.SumField()
    
    def on_complete(self, msg):
        # Write the model to the database. You're guaranteed to have the global aggregate now.
        msg.finish()
    
def handle_message(msg):
    msg.enable_async()
    body = json.loads(msg.body)
    phonon.registry.register(Session(
        id=body['user_id'],
        impressions=int(body['type'] == 'unit_impression'),
        clicks=int(body['type'] == 'unit_click', msg)

if __name__ == '__main__':
    phonon.connections.connect(hosts=['redis01.example.com', 'redis02.example.com'])
    nsq.Reader(
        topic=CONVERSION_EVENTS_TOPIC,
        channel=QR_CLICKS_AND_IMPRESSIONS_AGGREGATOR,
        message_handler=handle_message,
        lookupd_http_addresses=['nsq01.example.com', 'nsq02.example.com'],
        max_in_flight=10
    )
    nsq.run()
    

By declaring a session with the SumField fields populated with the quantity represented by the individual message we can ensure they're aggregated in the cache in a way that is lock free and conflict free.

Be sure to check out phonon.field.Fields for more types you can aggregate!

About

Event joining for efficient, high-volume ingestion pipelines.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages