Skip to content

Commit

Permalink
Refactoring custom job classes by introducing
Browse files Browse the repository at this point in the history
  • Loading branch information
mariano committed May 12, 2015
1 parent e0743c4 commit 4bfd3cd
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 185 deletions.
18 changes: 16 additions & 2 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,18 @@
All Notable changes will be documented in this file. This project adheres to
[Semantic Versioning](http://semver.org/).

## [1.2.0] - 2015-05-12

### Changed
- `JobInterface` is now a simpler interface. Its `load()` and `dump()` methods
have been moved to a `MarshalInterface`, effectively changing how custom Job
classes work.
- `Disque\Queue\MarshalException` has been moved to
`Disque\Queue\Marshal\MarshalException`.
- The `setJobClass()` method in `Queue` has been removed. Instead use
`setMarshaler()`, which should be given an instance of
`Disque\Queue\Marshaler\MarshalerInterface`.

## [1.1.0] - 2015-05-10

### Added
Expand All @@ -16,7 +28,7 @@ itself).
- Added `Disque\Queue\Queue` and `Disque\Queue\Job` class to offer a higher
level API that simplifies queueing and fetching jobs
- Added method `queue()` to `Disque\Client` to create / fetch a queue which
is an instance of `Disque\Queue\Queue`
is an instance of `Disque\Queue\Queue`.
- Added `schedule()` method to `Disque\Queue` that allows to easily schedule
jobs to be processed at a certain time.

Expand Down Expand Up @@ -58,5 +70,7 @@ parameters were specified, an `InvalidCommandArgumentException` was thrown.
- Added support for Predis connections, and allowing adding new connection
methods via `ConnectionInterface`.

[1.1.0]: https://github.com/mariano/disque-php/compare/1.0.0...HEAD
[unreleased]: https://github.com/mariano/disque-php/compare/1.2.0...HEAD
[1.2.0]: https://github.com/mariano/disque-php/releases/tag/1.2.0
[1.1.0]: https://github.com/mariano/disque-php/releases/tag/1.1.0
[1.0.0]: https://github.com/mariano/disque-php/releases/tag/1.0.0
88 changes: 55 additions & 33 deletions docs/README.md
Expand Up @@ -188,17 +188,17 @@ $disque->queue('my_queue')->processed($job);

## Changing the Job class

If you want to change the class used to represent a job, you can specify the
new class implementation (which should implement `Disque\Queue\JobInterface`)
via the queue's `setJobClass()` method. For example:
You can choose to have your own Job classes when using the Queue API. To do
so you start by implementing `Disque\Queue\JobInterface`, and make the class
take whatever shape you deem necessary. For example:

```php
class EmailJob implements \Disque\Queue\JobInterface
{
private $id;
private $email;
private $subject;
private $message;
public $email;
public $subject;
public $message;

public function __construct($email, $subject, $message)
{
Expand All @@ -207,28 +207,6 @@ class EmailJob implements \Disque\Queue\JobInterface
$this->message = $message ?: 'No message';
}

public static function load($source)
{
$body = @json_decode($source, true);
if (is_null($body)) {
throw new \Disque\Queue\MarshalException("Could not deserialize {$source}");
} elseif (!is_array($body) || empty($body['email']) || empty($body['subject'])) {
throw new \Disque\Queue\MarshalException("This doesn't look like an email");
}

$body += ['message' => null];
return new static($body['email'], $body['subject'], $body['message']);
}

public function dump()
{
return json_encode([
'email' => $this->email,
'subject' => $this->subject,
'message' => $this->message
]);
}

public function getId()
{
return $this->id;
Expand All @@ -248,12 +226,59 @@ class EmailJob implements \Disque\Queue\JobInterface
}
```

You can now push your email jobs to a queue:
You then have to create a Marshaler: a way for this job class to be serialized,
and deserialized. Marshalers should implement
`Disque\Queue\Marshal\MarshalerInterface`. You would normally use JSON, but
you are not required to. For example to create a marshaler for the above
`EmailJob` class we could do:

```php
class EmailJobMarshaler implements \Disque\Queue\Marshal\MarshalerInterface
{
public function unmarshal($source)
{
$body = @json_decode($source, true);
if (is_null($body)) {
throw new \Disque\Queue\Marshal\MarshalException("Could not deserialize {$source}");
} elseif (!is_array($body) || empty($body['email']) || empty($body['subject'])) {
throw new \Disque\Queue\Marshal\MarshalException('Not an email job');
}
$body += ['message' => null];
return new EmailJob($body['email'], $body['subject'], $body['message']);
}

public function marshal(\Disque\Queue\JobInterface $job)
{
if (!($job instanceof EmailJob)) {
throw new \Disque\Queue\Marshal\MarshalException('Not an email job');
}
return json_encode([
'email' => $job->email,
'subject' => $job->subject,
'message' => $job->message
]);
}
}
```

So as you can see `unmarshal()` will take a string, and should return an
instance of `Disque\Queue\JobInterface`, or throw a
`Disque\Queue\Marshal\MarshalException` if something went wrong. Similarly
`marshal()` takes a `Disque\Queue\JobInterface` and returns its string
representation, throwing a `Disque\Queue\Marshal\MarshalException` if something
went wrong.

To use this marshaler you create an instance of it, and set it via the Queue
`setMarshaler()` method:

```php
$queue = $disque->queue('emails');
$queue->setJobClass(EmailJob::class);
$queue->setMarshaler(new EmailJobMarshaler());
```

You can now push jobs to the queue:

```php
$job = new EmailJob('claudia@example.com', 'Hello world!', 'Hello from Disque :)');
$queue->push($job);
echo "JOB #{$job->getId()} pushed!\n";
Expand All @@ -263,9 +288,6 @@ When pulling jobs from the queue, you can take advantage of your custom job
implementation:

```php
$queue = $disque->queue('emails');
$queue->setJobClass(EmailJob::class);

while ($job = $queue->pull()) {
echo "Got JOB #{$job->getId()}!\n";
$job->send();
Expand Down
50 changes: 50 additions & 0 deletions src/Queue/BaseJob.php
@@ -0,0 +1,50 @@
<?php
namespace Disque\Queue;

abstract class Job implements JobInterface
{
/**
* Job ID
*
* @var string
*/
private $id;

/**
* Job body
*
* @var array
*/
private $body = [];

/**
* Build a job with the given body
*
* @param array $body Body
*/
public function __construct(array $body = [])
{
$this->setBody($body);
}

/**
* Get job ID
*
* @return string
*/
public function getId()
{
return $this->id;
}

/**
* Set job ID
*
* @param string $id Id
* @return void
*/
public function setId($id)
{
$this->id = $id;
}
}
58 changes: 1 addition & 57 deletions src/Queue/Job.php
@@ -1,17 +1,8 @@
<?php
namespace Disque\Queue;

use InvalidArgumentException;

class Job implements JobInterface
class Job extends BaseJob implements JobInterface
{
/**
* Job ID
*
* @var string
*/
private $id;

/**
* Job body
*
Expand All @@ -29,53 +20,6 @@ public function __construct(array $body = [])
$this->setBody($body);
}

/**
* Creates a JobInterface instance based on data obtained from queue
*
* @param string $source Source data
* @return JobInterface
* @throws MarshalException
*/
public static function load($source)
{
$body = @json_decode($source, true);
if (is_null($body)) {
throw new MarshalException("Could not deserialize {$source}");
}
return new static($body);
}

/**
* Marshals the body of the job ready to be put into the queue
*
* @return string Source data to be put in the queue
*/
public function dump()
{
return json_encode($this->getBody());
}

/**
* Get job ID
*
* @return string
*/
public function getId()
{
return $this->id;
}

/**
* Set job ID
*
* @param string $id Id
* @return void
*/
public function setId($id)
{
$this->id = $id;
}

/**
* Get job body
*
Expand Down
16 changes: 0 additions & 16 deletions src/Queue/JobInterface.php
Expand Up @@ -3,22 +3,6 @@

interface JobInterface
{
/**
* Creates a JobInterface instance based on data obtained from queue
*
* @param string $source Source data coming from the queue
* @return JobInterface
* @throws MarshalException
*/
public static function load($source);

/**
* Marshals the body of the job ready to be put into the queue
*
* @return string Source data to be put in the queue
*/
public function dump();

/**
* Get job ID
*
Expand Down
35 changes: 35 additions & 0 deletions src/Queue/Marshal/JobMarshaler.php
@@ -0,0 +1,35 @@
<?php
namespace Disque\Queue\Marshal;

use Disque\Queue\Job;
use Disque\Queue\JobInterface;

class JobMarshaler implements MarshalerInterface
{
/**
* Creates a JobInterface instance based on data obtained from queue
*
* @param string $source Source data
* @return JobInterface
* @throws MarshalException
*/
public function unmarshal($source)
{
$body = @json_decode($source, true);
if (is_null($body)) {
throw new MarshalException("Could not deserialize {$source}");
}
return new Job($body);
}

/**
* Marshals the body of the job ready to be put into the queue
*
* @param JobInterface $job Job to put in the queue
* @return string Source data to be put in the queue
*/
public function marshal(JobInterface $job)
{
return json_encode($job->getBody());
}
}
8 changes: 8 additions & 0 deletions src/Queue/Marshal/MarshalException.php
@@ -0,0 +1,8 @@
<?php
namespace Disque\Queue\Marshal;

use Disque\Queue\QueueException;

class MarshalException extends QueueException
{
}
24 changes: 24 additions & 0 deletions src/Queue/Marshal/MarshalerInterface.php
@@ -0,0 +1,24 @@
<?php
namespace Disque\Queue\Marshal;

use Disque\Queue\JobInterface;

interface MarshalerInterface
{
/**
* Creates a JobInterface instance based on data obtained from queue
*
* @param string $source Source data
* @return JobInterface
* @throws MarshalException
*/
public function unmarshal($source);

/**
* Marshals the body of the job ready to be put into the queue
*
* @param JobInterface $job Job to put in the queue
* @return string Source data to be put in the queue
*/
public function marshal(JobInterface $job);
}
6 changes: 0 additions & 6 deletions src/Queue/MarshalException.php

This file was deleted.

0 comments on commit 4bfd3cd

Please sign in to comment.