Skip to content

Latest commit

 

History

History
720 lines (474 loc) · 38.3 KB

DEV_php.md

File metadata and controls

720 lines (474 loc) · 38.3 KB

Add your own Badge to the Plan9 PHP Backend

For more information on installing all pre-requistes and playing Plan 9 From Outer Space, see the main README.md.

Introduction

The Plan9 From Outer Space game demonstrates the integration of a massively parallel data-driven real-time backend based on Apache Storm into a web application that features frequent user interaction. For demonstration purposes, we use the Storm backend to notice certain events, based on which users are awarded "badges". Badges modify the number of points of a user (awarding bonus points, removing penalty points) or trigger specific actions (like giving a user the superpower of receiving the points generated by all other users for a certain period of time).

These simple badges, however, demonstrate problems often encoutered in the real-world, like performing joins of multiple data streams, or enhancing user events with background informations (like noticing when a user enters the website category, where he spends most of his time and awarding special offers based onthe revenue he generated last month).

The HighFiveBolt.php provides a simple example of joining tuples in a data stream using the Redis key-value store.

Here, we will create another Bolt that performs a local aggregation (for simplicity also backed by Redis) and periodically emits a data tuple based on that short-lived aggregation.

The story background in the Plan9 game is as follows:

  • Users play and uncover pixels, earning points
  • Periodically, zombies called the "Kitten Robbers From Outer Space" will come and steal kittens from some player.
  • Because the zombies are deluded in their quest to equalize the world, they will always attack the player that recently made the most points

The Storm bolt thus has to implement the following logic:

  • Read the Plan9 points tuple stream
  • Aggregate the generated points for each user
  • After a certain interval, pick the user with the most points and create a badge message that:
    1. Informs the user of the attack
    2. Remove 20% of the recently earned points from the user to simulate a kitten robbery
  • Clear the aggregation map for the next round

To trigger the periodic attack action, we will use a Storm feature called "tick tuples". Each component in a Storm topology can be individually configured to receive a "tick tuple" from Storm itself at a pre-set frequency. We provide a skeleton PHP bolt implementation that processes tick tuples: EmptyTickTupleBolt.php, as well as a Storm multilang adapter that provides an easy way to configure tick tuples: MultilangAdapterTickTupleBolt.java.

Read more about Storm tick tuples here: Excursus: Tick Tuples in Storm 0.8+.

Implement PHP Component

To create the described Diluted Kitten Robbers Features, we will first create a new PHP bolt called "DelutedKittenRobbersBolt" based on the EmptyTickTupleBolt.php in the "deck36-php-web-app" project.

Because the web app uses the Symfony2 framework, we need not only copy an existing bolt implementation and extend it, but we also need do this in the context of a Symfony2 bundle and create the necessary configurations to enable our bolt to use dependency injection. As the bolt will be executed as a shell command from Storm, we create a Symfony2 command that we can later on invke from Storm. We will group all our PHP bolt implementations with a Symfony2 bundle called StormBundle.

cd src/Deck36/Bundle/StormBundle/Command/
cp EmptyTickTupleBolt.php DeludedKittenRobbers.php
cp EmptyTickTupleBoltCommand.php DeludedKittenRobbersCommand.php

We have thus created two new classes, DeludedKittenRobbers and DeludedKittenRobbersCommand. The DeludedKittenRobbers class implements our Storm component and its respective business logic. To this end, it extends BasicBolt, a class implementing the Storm multilang protocol which is found in the storm.php file.

The DeludedKittenRobbersCommand class implements the Symfony2 ContainerAwareCommand and thus creates a Symfony2 command that will be provided with the Symfony2 service container, so it can access databases and other configurations.

Define new Symfony2 Bolt command

We modify the DeludedKittenRobbersCommand in the following way:

<?php 

namespace Deck36\Bundle\StormBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class DeludedKittenRobbersCommand extends ContainerAwareCommand
{

    protected function configure()
    {
        $this
            ->setName('storm:bolt:DeludedKittenRobbers')
            ->setDescription('Start Plan9 DeludedKittenRobbers Storm Bolt')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $bolt = $this->getContainer()->get("deck36_storm.command.deluded_kitten_robbers_bolt");
        $bolt->run();     
    }
}

As you can see, the command implementation will access the service container from the deck36_storm.command.deluded_kitten_robbers_bolt service. We will define that service in a minute.

Create Bolt configuration

In order to be able to configure the bolt behaviour, we need to define its parameters in the storm.yml file.

Please note:

  1. You can override the defaults set in storm.yml for the dev and prod environments by modifying the storm_dev.yml and storm_prod.yml files respectively.
  2. The yaml files will also be read by the Storm topology which is implemented in Java. Therefore, some values might be required, but are not used by the PHP implementation.

You can copy the configuration structure from the EmptyTickTupleBolt and modify it respectively:

DeludedKittenRobbersBolt:
    main:       "storm:bolt:DeludedKittenRobbers"
    params:
    rabbitmq:
        exchange:        "plan9"
        routing_key:     "#"
        target_exchange: "plan9-backchannel"
    attack_frequency_secs:  10        
    badge:
        name: "DeludedKittenRobbers"
        text: "The deluded Kitten Robbers came from Outer Space to punish you for exhilarating too many too young kittens."
        color:   "red"
        size:    "2em"
        effect:  "explode"

Add configuration tree

We need to define a config tree so that Symfony2 can validate the configuration we provide for our DeludedKittenRobbersBolt. To this end we need to add it to StormBundle/DependencyInjection/Configuration.php:

->arrayNode('DeludedKittenRobbersBolt')
    ->children()
        ->scalarNode('main')
            ->defaultValue('storm:bolt:DeludedKittenRobbers')
        ->end()
        ->variableNode('params')                            
        ->end()
        ->arrayNode('rabbitmq')
            ->children()
                ->scalarNode('exchange')
                    ->defaultValue('plan9')
                ->end()
                ->scalarNode('routing_key')
                    ->defaultValue('#')
                ->end()
                ->scalarNode('target_exchange')
                    ->defaultValue('plan9_target')
                ->end()                                
            ->end()
        ->end()
        ->integerNode('attack_frequency_secs')
            ->min(0)
        ->end()
        ->arrayNode('badge')
            ->children()
                ->scalarNode('name')
                ->end()
                ->scalarNode('text')
                ->end()
                ->scalarNode('color')
                ->end()
                ->scalarNode('size')
                ->end()
                ->scalarNode('effect')
                ->end()
            ->end()
        ->end()
    ->end()
->end()

Define Symfony2 service

We now need to define the Symfony2 service which will read and use the configuration, we just created.

We can add the service in StormBundle/Resources/config/services.xml.

First we need to create a parameter which defines which class actually implements our bolt:

<parameters>
    
    <parameter key="deck36_storm.command.deluded_kitten_robbers_bolt.class">Deck36\Bundle\StormBundle\Command\DeludedKittenRobbers</parameter>
    
    ...        

</parameters>

Then we can add the parameters from the config which shall be passed into the service container:

<services>

    <service id="deck36_storm.command.deluded_kitten_robbers_bolt"
             class="%deck36_storm.command.deluded_kitten_robbers_bolt.class%">              
        <argument type="service" id="service_container" />  
      
        <argument>%deludedkittenrobbers__badge_name%</argument>  
        <argument>%deludedkittenrobbers__badge_text%</argument>  
        <argument>%deludedkittenrobbers__badge_size%</argument>  
        <argument>%deludedkittenrobbers__badge_color%</argument> 
        <argument>%deludedkittenrobbers__badge_effect%</argument>

    </service>

    ...

</services>

Extend Deck36StormExtension

We now need to add code to the Deck36StormExtension in order to load our parameters into the service container:

// DeludedKittenRobbersBolt        
$container->setParameter('deludedkittenrobbers__badge_name',   $config['DeludedKittenRobbersBolt']['badge']['name']);
$container->setParameter('deludedkittenrobbers__badge_text',   $config['DeludedKittenRobbersBolt']['badge']['text']);
$container->setParameter('deludedkittenrobbers__badge_size',   $config['DeludedKittenRobbersBolt']['badge']['size']);
$container->setParameter('deludedkittenrobbers__badge_color',  $config['DeludedKittenRobbersBolt']['badge']['color']);
$container->setParameter('deludedkittenrobbers__badge_effect', $config['DeludedKittenRobbersBolt']['badge']['effect']);

Implement Bolt Business Logic

Now that we have setup our Symfony2 command, its service container, and have populated it with data, we can finally implement our mian business logic.

It starts with the usual PHP header, we extend BasicBolt to use the underlying implementation of the Storm multilang protocol and define a constructor to receive the service container and our defined parameters. Furthermore, we extract the user manager, so we can use it to access and modify the user-related Doctrine entities in the persistence layer.

<?php 

namespace Deck36\Bundle\StormBundle\Command;
 
use Deck36\Bundle\Plan9Bundle\Entity\Badge;
use Symfony\Component\DependencyInjection\Container;

require_once('storm.php');

class DeludedKittenRobbers extends BasicBolt
{

    private $container;
    private $userManager;

    private $deludedKittenRobbersBadgeName;
    private $deludedKittenRobbersBadgeText;
    private $deludedKittenRobbersBadgeSize;
    private $deludedKittenRobbersBadgeColor;
    private $deludedKittenRobbersBadgeEffect;
    
    public function __construct(Container $container,
                                $deludedKittenRobbersBadgeName,
                                $deludedKittenRobbersBadgeText,
                                $deludedKittenRobbersBadgeSize,
                                $deludedKittenRobbersBadgeColor,
                                $deludedKittenRobbersBadgeEffect
                                ) {
        parent::__construct();
        $this->container = $container;
        $this->userManager = $container->get('fos_user.user_manager'); 

        $this->deludedKittenRobbersBadgeName = $deludedKittenRobbersBadgeName;
        $this->deludedKittenRobbersBadgeText = $deludedKittenRobbersBadgeText;
        $this->deludedKittenRobbersBadgeSize = $deludedKittenRobbersBadgeSize;
        $this->deludedKittenRobbersBadgeColor = $deludedKittenRobbersBadgeColor;
        $this->deludedKittenRobbersBadgeEffect = $deludedKittenRobbersBadgeEffect;        
    }

// ...
}

Now, because we use tick tuples, we need a way to discern usual tuples from those special tick tuples. Storm will send tick tuples from the __tick stream of the special __system component. We can therefore check if tuples originate from this stream to detect tick tuples:

private function isTickTuple(Tuple $tuple) {
    return (($tuple->component === '__system') && ($tuple->stream === '__tick'));
}

The main method that is called for each tuple in a Storm bolt component is the process method. We can use a simple pattern to branch the processing depending on the type of tuple received:

private function processTickTuple(Tuple $tuple) {
    // process tick tuples
}

private function processTuple(Tuple $tuple) {
    // process regular tuples
}


public function process(Tuple $tuple)
{
    // check for tick tuple
    if ($this->isTickTuple($tuple)) {            
        $this->processTickTuple($tuple);
    } else {
        $this->processTuple($tuple);
        $this->ack($tuple);
    }            
}

For our bolt implementing the game logic of random attacks of kitten friends by deluded kitten robbers from outer space, we don't need to process any regular tuples. We thus only need to implement the processTickTuple method:

private function processTickTuple(Tuple $tuple) {
    // process tick tuples

    // get a date object to determine the current time
    $date = new \DateTime();
 
    //// Get a random active user from redis

    // Get redis client from Symfony2 service container
    $redis = $this->container->get("snc_redis.default");

    // Active users do have a session key starting with 'user_'
    // We query all active users from Redis:
    $activeUsers = $redis->keys('user_*');

    // Now we select a random user as the one to be attacked 
    // by the Kitten Robbers:
    $randIdx = 0;
    $randIdx = rand(0, count($activeUsers)-1);
    
    $robbedUserStr = $activeUsers[$randIdx];

    $tokens = explode("_", $robbedUserStr);
    $robbedUser = intval($tokens[1]);


    //// persist badge to database
    $userRef = $this->userManager->findUserBy(array('id' => $robbedUser));

    $badge = new Badge();
    $badge->setName($this->deludedKittenRobbersBadgeName);
    $badge->setImage("");
    $badge->setCreatedAt($date);

    $userRef->addBadge($badge);
    $this->userManager->updateUser($userRef);

    //// construct badge message and emit to further Storm components
    $deludedKittenRobbers = array();        
    $deludedKittenRobbers['user'] = array();
    $deludedKittenRobbers['user']['user_id'] = $robbedUser;
    
    $deludedKittenRobbers['timestamp'] = $date->getTimestamp();

    $deludedKittenRobbers['type'] = 'badge';
    $deludedKittenRobbers['version'] = 1;
    
    $deludedKittenRobbers['badge'] = array();
    $deludedKittenRobbers['badge']['name'] = $this->deludedKittenRobbersBadgeName;
    $deludedKittenRobbers['badge']['text'] = $this->deludedKittenRobbersBadgeText;
    $deludedKittenRobbers['badge']['size'] = $this->deludedKittenRobbersBadgeSize;
    $deludedKittenRobbers['badge']['color'] = $this->deludedKittenRobbersBadgeColor;
    $deludedKittenRobbers['badge']['effect'] = $this->deludedKittenRobbersBadgeEffect;
    
    $deludedKittenRobbers['points'] = array();
    $deludedKittenRobbers['points']['increment'] = -100;            

    $deludedKittenRobbers['action'] = array();
    $deludedKittenRobbers['action']['type'] = 'unsolve';
    $deludedKittenRobbers['action']['amount'] = 1;

    // emit the badge message 
    // we emit the tuple unanchored, because we 
    // don't anchor to ephemeral tick tuples
    $this->emit([$deludedKittenRobbers]);    

}

Test the Deluded Kitten Robbers

Our storm bolt command is now ready. We can execute it using:

app/console storm:bolt:DeludedKittenRobbers

The bolt will now wait for stdin input according to the Storm multilang protocol. We provide an exemplary multilang log in test/test_multilang_bolt__KittenRobber.txt.

You can just pipe that log into the bolt to see it in action:

$ cat ../deck36-api-backend/test/test_multilang_bolt__KittenRobber.txt | app/console storm:bolt:DeludedKittenRobbers
{"pid":9915}
end
{"command":"sync"}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"emit","anchors":["-6955786537413359385"],"tuple":[{"user":{"user_id":1},"timestamp":1411028105,"type":"badge","version":1,"badge":{"name":"DeludedKittenRobbers","text":"The deluded Kitten Robbers came from Outer Space to punish you for exhilarating too many too young kittens.","size":"2em","color":"red","effect":"explode"},"points":{"increment":-100},"action":{"type":"unsolve","amount":1}}]}
end
{"command":"ack","id":"-6955786537413359385"}
end
{"command":"ack","id":"-6955786537413359385"}
end

You can now see the multilang protocol in action. The first message returned by the bolt is its pid, so Storm can monitor or terminate the process if necessary.

The second message is a "sync" command. The "sync" command is deprecated for bolts, but required for spouts. Therefore, for bolts, it only happens once during the initial handshake.

Then there are a number of acknowledgements, because the test file contains several "points" messages to allow testing of more complex logic than we have currently implemented. Then, the test log contains a tick tuple, which triggers our logic and a "DeludedKittenRobbers" badge is produced.

Implement Storm Topology

In order to connect our badge to the awesome Plan9 game, we now need to create Storm topology that uses the bolt and deploy that topology to the Storm cluster.

As Storm is based on the Java Virtual Machine environment, we need to implement the topology in Java. This is not hard and you can think of it as a wrapper. We then need to package our wrapper topology and our PHP project into a single JAR (Java ARchive) file, which then allows us to deploy the whole code to a Storm Cluster.

Package PHP project as Phar

In order to include our PHP project into the Storm topology JAR, we must follow strict rules about where our custom code resides in the JAR, so that Storm will be able to find our code and all its dependencies. This task becomes very easy when we first package our PHP project into a Phar (PHp ARchive). To make this step easy, we use the Box project, a tool that greatly simplifies the creation of Phar archives.

In order to create a Phar archive using Box, we must create a box.json file to define the Phar we want to build:

{
    "chmod": "0755",
    "directories": [
        "src", "app"
    ],
    "files": [
        "LICENSE"
    ],
    "finder": [
        {
            "name": "*",
            "exclude": ["Tests", "tests", "phpunit", "mockery", "guzzle", "behat", "sebastian", "sqmk", "twitter"],
            "in": "vendor"
        }
    ],
    "git-version": "package_version",
    "main": "app/console",
    "output": "target/resources/deck36-plan9-storm.phar",
    "stub": true
}

As you can see, we include the src and app directories and exclude dependenies that are only required during development stages. We also define that the Phar will run app/console if executed. Furthermore, our Phar file should be put into the target/resources folder and be called deck36-plan9-storm.phar.

IMPORTANT NOTES:

  1. Box will write and modify the Phar multiple times in order to create it. As modifying Phar is a potential security threat in webservers, the default PHP configuration prohibits to write to Phar files. In order to allow Box to create the Phar, you must specify phar.readonly=0 in your PHP configuration.

  2. The PHP project contains many PHP source files. As every file requires a change of the Phar archive, the creation process will perform a lot of I/O operations. These operations are particularly slow, when they are executed on files that are mounted over the network. The Phar creation process will therefore be much faster, if it is performed not within the Vagrant, but in the host system while the Vagrant is not running, so the filesystem will not be mounted.

You can create the Phar file by simply executing ./box.phar build. However, because that will take TODO-QUITE-SOME-TIME, we provide the Phar file already in our repository.

Box also allows to modify or add files to an existing Phar archive, so to not stall the workshop, we will simply add all the files we have changed to the Phar file:

# add Symfony2 service definition

./box.phar add -r target/resources/deck36-plan9-storm.phar src/Deck36/Bundle/StormBundle/DependencyInjection/Configuration.php src/Deck36/Bundle/StormBundle/DependencyInjection/Configuration.php

./box.phar add -r target/resources/deck36-plan9-storm.phar src/Deck36/Bundle/StormBundle/DependencyInjection/Deck36StormExtension.php src/Deck36/Bundle/StormBundle/DependencyInjection/Deck36StormExtension.php

./box.phar add -r target/resources/deck36-plan9-storm.phar src/Deck36/Bundle/StormBundle/Resources/config/routing.xml src/Deck36/Bundle/StormBundle/Resources/config/routing.xml

./box.phar add -r target/resources/deck36-plan9-storm.phar src/Deck36/Bundle/StormBundle/Resources/config/services.xml src/Deck36/Bundle/StormBundle/Resources/config/services.xml

# add Storm bolt configuration

./box.phar add -r target/resources/deck36-plan9-storm.phar app/config/storm.yml app/config/storm.yml
./box.phar add -r target/resources/deck36-plan9-storm.phar app/config/storm_dev.yml app/config/storm_dev.yml
./box.phar add -r target/resources/deck36-plan9-storm.phar app/config/storm_prod.yml app/config/storm_prod.yml

# add PHP bolt implementation

./box.phar add -r target/resources/deck36-plan9-storm.phar src/Deck36/Bundle/StormBundle/Command/DeludedKittenRobbers.php src/Deck36/Bundle/StormBundle/Command/DeludedKittenRobbers.php

./box.phar add -r target/resources/deck36-plan9-storm.phar src/Deck36/Bundle/StormBundle/Command/DeludedKittenRobbersCommand.php src/Deck36/Bundle/StormBundle/Command/DeludedKittenRobbersCommand.php

You can now run target/resources/deck36-plan9-storm.phar to see that we have successfully added our DeludedKittenRobbers bolt command. Now test, if the Phar file is functional:

cat ../deck36-api-backend/test/test_multilang_bolt__KittenRobber.txt | target/resources/deck36-plan9-storm.phar storm:bolt:DeludedKittenRobbers

Create Java wrapper topology

Within the Storm wrapper topology, we will use the DECK36 MultilangAdapterTickTupleBolt.java wrapper. That Java class implements a Bolt that will forward all tuples to the PHP bolt and configures Storms to send tick tuples.

To create the full topology, we start from the HighFiveBadgeTopology already present in our "deck36-storm-backend-php" project:

cd src/jvm/deck36/storm/plan9/php
cp HighFiveBadgeTopology.java DeludedKittenRobbersTopology.java

Let's now walk through this topology an make the necessary changes.

First, let's rename our topology:

public class DeludedKittenRobbersTopology {

    private static final Logger log = LoggerFactory.getLogger(DeludedKittenRobbersTopology.class);

Then, we already come to our main method. The first part check, if we have supplied a command line parameter that must be either "dev" or "prod".

    public static void main(String[] args) throws Exception {

        String env = null;

        if (args != null && args.length > 0) {
            env = args[0];
        }

        if (! "dev".equals(env))
            if (! "prod".equals(env)) {
                System.out.println("Usage: $0 (dev|prod)\n");
                System.exit(1);
            }

We will use that parameter for:

  1. Choose which config we will load.
  2. Decide whether to run the topology in either local mode ("dev") or whether it should be deployed to the cluster ("prod").

Storm features a topology-wide config. This config controls several aspects of Storm internals (like timeouts, etc.), and is also forwarded into each component (bolts or spouts). We will thus add our application config to the main topology config. We will load our application config from YaML files supplied with the "deck36-php-web-app" project.

Let's read our config:

        // Topology config
        Config conf = new Config();

        // Load parameters and add them to the Config
        Map configMap = YamlLoader.loadYamlFromResource("storm_" + env + ".yml");

        conf.putAll(configMap);

        log.info(JSONValue.toJSONString((conf)));

        // Set topology loglevel to DEBUG
        conf.put(Config.TOPOLOGY_DEBUG, JsonPath.read(conf, "$.deck36_storm.debug"));

The next main part is the Storm TopologyBuilder. The TopologyBuilder is used to add all components (bolts/spouts) and to configure all of their settings (connections between components, parallelism).

        // Create Topology builder
        TopologyBuilder builder = new TopologyBuilder();

There are two parameters for configuring the parallelism. The parallelism hint configures the number of executors, while num tasks configures the number of tasks. An executor is an actual JVM thread that reads and writes from/to the internal Storm queues. Two separate executors can thus be run on two different machines. A task, however, is a pseudo-parallelization within one executor. If you have, for instance, two executors and 8 *tasks, then Storm will launch two threads executing 4 tasks each. Within one executor, these tasks will be executed sequentially.

This starts to make sense taking two moe points into account: the number of tasks is fixed for the lifetime of a topology, while the number of actual executors can be changed dynamically. Thus, by initially specifying just one executor, but 8 tasks you can later on expand processing for that component onto up to 8 machines.

While tasks are very lightweight as long as you use pure JVM components, one caveat when using the multilang protocol is that each task spawns its own external processing process. Thus, a JVM bolt with one executor and 8 tasks will equal to one thread in the JVM, while a multilang bolt with the same configuration will be equal to one JVM thread plus 8 spawned system processes. Therefore, don't overplan when using multilang bolts. As a side note, there are further strategies, like deploying a certain topology multiple times or do rolling re-deployments, to deal with dynamic parallelism changes that might be more suitable when dealing with a lot multilang bolts.

        // if there are not special reasons, start with parallelism hint of 1
        // and multiple tasks. By that, you can scale dynamically later on.
        int parallelism_hint = JsonPath.read(conf, "$.deck36_storm.default_parallelism_hint");
        int num_tasks = JsonPath.read(conf, "$.deck36_storm.default_num_tasks");

Now, having finished preparing our configuration, we come to our first component: a spout reading messages from RabbitMQ from storm-rabbitmq. Please check the repository from that poject for more information on how to use and configure the spout. The main thing we have to do here and now is to change the JSONPath to our bolt configuration. We will use the key "DeludedKittenRobbersBolt" for this configuration, which we will create later:

        // Create Stream from RabbitMQ messages
        // bind new queue with name of the topology
        // to the main plan9 exchange (from properties config)
        // consuming only CBT-related events by using the rounting key 'cbt.#'

        String badgeName = DeludedKittenRobbersTopology.class.getSimpleName();

        String rabbitQueueName = badgeName; // use topology class name as name for the queue
        String rabbitExchangeName = JsonPath.read(conf, "$.deck36_storm.DeludedKittenRobbersBolt.rabbitmq.exchange");
        String rabbitRoutingKey = JsonPath.read(conf, "$.deck36_storm.DeludedKittenRobbersBolt.rabbitmq.routing_key");


        // Get JSON deserialization scheme
        Scheme rabbitScheme = new SimpleJSONScheme();

        // Setup a Declarator to configure exchange/queue/routing key
        RabbitMQDeclarator rabbitDeclarator = new RabbitMQDeclarator(rabbitExchangeName, rabbitQueueName, rabbitRoutingKey);

        // Create Configuration for the Spout
        ConnectionConfig connectionConfig =
                new ConnectionConfig(
                        (String)    JsonPath.read(conf, "$.deck36_storm.rabbitmq.host"),
                        (Integer)   JsonPath.read(conf, "$.deck36_storm.rabbitmq.port"),
                        (String)    JsonPath.read(conf, "$.deck36_storm.rabbitmq.user"),
                        (String)    JsonPath.read(conf, "$.deck36_storm.rabbitmq.pass"),
                        (String)    JsonPath.read(conf, "$.deck36_storm.rabbitmq.vhost"),
                        (Integer)   JsonPath.read(conf, "$.deck36_storm.rabbitmq.heartbeat"));

        ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
                .queue(rabbitQueueName)
                .prefetch((Integer) JsonPath.read(conf, "$.deck36_storm.rabbitmq.prefetch"))
                .requeueOnFail()
                .build();

        // add global parameters to topology config - the RabbitMQSpout will read them from there
        conf.putAll(spoutConfig.asMap());

        // For production, set the spout pending value to the same value as the RabbitMQ pre-fetch
        // see: https://github.com/ppat/storm-rabbitmq/blob/master/README.md
        if ("prod".equals(env)) {
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, (Integer) JsonPath.read(conf, "$.deck36_storm.rabbitmq.prefetch"));
        }

We can now add our RabbitMQ spout by using the "setSpout()" method of our TopologyBuilder instance:

        // Add RabbitMQ spout to topology
        builder.setSpout("incoming",
                new RabbitMQSpout(rabbitScheme, rabbitDeclarator),
                parallelism_hint)
                .setNumTasks((Integer) JsonPath.read(conf, "$.deck36_storm.rabbitmq.spout_tasks"));

The messages emitted by the RabbitMQ spout are then directly fed into our PHP bolt. In order to invoke it, we need to build a list representing our invocation of our PHP Phar binary, the bolt to execute, and optional arguments.

    // construct command to invoke the external bolt implementation
    ArrayList<String> command = new ArrayList(15);

    // Add main execution program (php, hhvm, zend, ..) and parameters
    command.add((String) JsonPath.read(conf, "$.deck36_storm.php.executor"));
    command.addAll((List<String>) JsonPath.read(conf, "$.deck36_storm.php.executor_params"));

    // Add main command to be executed (app/console, the phar file, etc.) and global context parameters (environment etc.)
    command.add((String) JsonPath.read(conf, "$.deck36_storm.php.main"));
    command.addAll((List<String>) JsonPath.read(conf, "$.deck36_storm.php.main_params"));

    // Add main route to be invoked and its parameters
    command.add((String) JsonPath.read(conf, "$.deck36_storm.DeludedKittenRobbersBolt.main"));
    List boltParams = (List<String>) JsonPath.read(conf, "$.deck36_storm.DeludedKittenRobbersBolt.params");
    if (boltParams != null)
        command.addAll(boltParams);

    // Log the final command
    log.info("Command to start bolt for Deluded Kitten Robbers: " + Arrays.toString(command.toArray()));

We are now ready to actually add our PHP bolt to the topology. To this end, we use the MultilangAdapterTickTupleBolt.java and read the tick frequency from the configuration. We simply call our bolt "badge" and connect to the RabbitMQ spout by using .shuffleGrouping("incoming"). Note: "incoming" is the name we have given to our RabbitMQ spout.

    // Add constructed external bolt command to topology using MultilangAdapterTickTupleBolt
    builder.setBolt("badge",
            new MultilangAdapterTickTupleBolt(
                    command,
                    (Integer) JsonPath.read(conf, "$.deck36_storm.DeludedKittenRobbersBolt.attack_frequency_secs"),
                    "badge"
            ),
            parallelism_hint)
            .setNumTasks(num_tasks)
            .shuffleGrouping("incoming");

We will now receive the Kitten Robber attack badges from our badge bolt. We now only need two more bolts, one to serialize and add routing informations, and another one that actually pushes the data abck to RabbitMQ. The router bolt and the push bolt could be easily merged into one, but the separation keeps the push bolt so general, that it can be easily used to push messages received from multiple Storm streams within more complex topologies:

        builder.setBolt("rabbitmq_router",
                new Plan9RabbitMQRouterBolt(
                        (String) JsonPath.read(conf, "$.deck36_storm.DeludedKittenRobbersBolt.rabbitmq.target_exchange"),
                        "DeludedKittenRobbers" // RabbitMQ routing key
                ),
                parallelism_hint)
                .setNumTasks(num_tasks)
                .shuffleGrouping("badge");

        builder.setBolt("rabbitmq_producer",
                new Plan9RabbitMQPushBolt(),
                parallelism_hint)
                .setNumTasks(num_tasks)
                .shuffleGrouping("rabbitmq_router");

Hooray! Our topology is finished and be executed! For testing purposes, a Storm cluster can be emulated using LocalCluster. In that case, we need to add an artificial delay using Thread.sleep() to keep the JVM from exiting after submitting the topology to the LocalCluster in the same JVM.

        if ("dev".equals(env)) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(badgeName + System.currentTimeMillis(), conf, builder.createTopology());
            Thread.sleep(2000000);
        }

For production use, we only need to use the static `StormSubmitter.submitTopology()' and off we go!

        if ("prod".equals(env)) {
            StormSubmitter.submitTopology(badgeName + "-" + System.currentTimeMillis(), conf, builder.createTopology());
        }

Et voila!

Package Storm Topology

We need to compile our Java wrappers and the Java Topology and bundle it all up into an "uberjar" that contains all dependencies and can be deployed to the cluster. Note: As of July, 2014, Storm and all submitted topologies will share the same classpath. That might cause problems, when requiring dependencies in different incompatible version to those that are required by Storm. There are efforts to change that in the future, but it hasn't yet arrived. However, as we use multilang bolts for our main logic, we don't need to care too much. For the same reason, we need Storm as a dependency to compile our classes, but we must not include it in our uberjar. Otherwise it will not be possible to submit the topology to an actual cluster. Our build.sh handles that for us.

The target directory of our "deck36-php-web-app" project (where our Phar file resides) is symlinked to our "deck36-storm-backend-php" resources directory and will thus be included in the JAR file automatically.

If you have not prepared the build environment as described in the deck36-storm-backend-php README.md, then you need to do this now:

./prepare.sh

Once the build enviroment is prepared, build the uberjar by executing:

./build.sh

Run

Locally (for testing)

In our vagrant enviroment, Storm is installed in /opt/storm. We can thus execute the topology locally by:

/opt/storm/bin/storm jar target/deck36-storm-backend-php-0.0.1-SNAPSHOT-standalone.jar deck36.storm.plan9.php.DeludedKittenRobbersTopology dev

Deploy to Cluster

Deploying to a cluster is very simple as well, because all the cluster config is handled by the storm command. We thus just need to switch dev to prod:

/opt/storm/bin/storm jar target/deck36-storm-backend-php-0.0.1-SNAPSHOT-standalone.jar deck36.storm.plan9.php.DeludedKittenRobbersTopology prod