Install disque-php via Composer:
$ composer require mariano/disque-php --no-dev
If you want to run its tests remove the --no-dev
argument.
First you will need to create an instance of Disque\Client
, specifying a list
of hosts and ports where different Disque nodes are installed:
$client = new \Disque\Client([
'127.0.0.1:7711',
'127.0.0.1:7712'
]);
If no host is specified, 127.0.0.1:7711
is assumed. Hosts can also be added
via the addServer($host, $port)
method:
$client = new \Disque\Client();
$client->addServer('127.0.0.1', 7712);
Now you are ready to start interacting with Disque. This library provides a Queue API for easy job pushing/pulling, and direct access to all Disque commands via its Client API. If you are looking to get jobs into queues, and process them in a simple way, then the Queue API is your best choice as it abstracts Disque's internal protocol. If you instead wish to do more advanced stuff with Disque, use the Client API.
To simplify the most common tasks with Disque this package offers a higher level API to push jobs to Disque, and retrieve jobs from it.
When using the Queue API you won't need to tell the client when to connect, as it will do it automatically as needed. If however you want to influence the way connections are established (such as by setting it to smartly switch between nodes based on where the jobs come from), then go ahead and read the documentation on connections from the Client API.
Before being able to push jobs into queues, or pull jobs from them, you will need to get a queue. You can fetch a queue by name. No need to create nor delete queues, Disque manages the queue lifecycle automatically.
$queue = $disque->queue('emails');
Once you have obtained the queue, you can either push jobs to it, or pull jobs
from it. Jobs are instances of Disque\Queue\Job
, which offers the following
methods (among others used by the Queue API):
getBody(): array
: gets the body of the job.setBody(array $body)
: sets the body of the job.
These methods show that by default job bodies are arrays, and they get serialized into JSON when sending them to Disque. If you want to change the default job implementation used in a queue, you can do so as shown in the Changing the Job class section.
The simplest way to push a job to the queue is by using its push()
method:
$job = new \Disque\Queue\Job(['name' => 'Mariano']);
$disque->queue('my_queue')->push($job);
You can specify different options that will affect how the job is placed on
the queue through its second, optional, argument $options
. For available
options see the documentation on addJob. For example to push a job
to the queue but automatically remove it from the queue if after 1 minute it
wasn't processed, we would do:
$job = new \Disque\Queue\Job(['description' => 'To be handled within the minute!']);
$disque->queue('my_queue')->push($job, ['ttl' => 60]);
If you want to push a job to be processed at a specific time in the future, check out the Scheduling jobs section.
You get jobs, one at a time, using the pull()
method. If there are no jobs
available, this call will block until a job is placed on the queue. To get
a job:
$queue = $disque->queue('my_queue');
$job = $queue->pull();
var_dump($job->getBody());
$queue->processed($job);
Make sure to always acknowledge a job once you are done processing it, as explained in the Acknowledging jobs section.
You can obviously process as many jobs as there are already, and as they become available:
$queue = $disque->queue('my_queue');
while ($job = $queue->pull()) {
echo "GOT JOB!\n";
var_dump($job->getBody());
$queue->processed($job);
}
The call to pull()
is blocking, so you may find yourself in the need to do
something with the time you spend while waiting for jobs. Fortunately pull()
receives an optional argument: the number of milliseconds to wait for a job.
If this time passed and no job was available, a
Disque\Queue\JobNotAvailableException
is thrown. For example if we want to
wait for jobs, but do something else if after 1 second passed without jobs
becoming available, and then keep waiting for jobs, we would do:
$queue = $disque->queue('my_queue');
while (true) {
try {
$job = $queue->pull(1000);
} catch (\Disque\Queue\JobNotAvailableException $e) {
// Do something else while waiting!
echo "Still waiting...\n";
continue;
}
echo "GOT JOB!\n";
var_dump($job->getBody());
$queue->processed($job);
}
If you want to push jobs to the queue but don't have them ready for processing
until a certain time, you can take advantage of the schedule()
method. This
method takes the job as its first argument, and a DateTime
(which should be
set to the future) for when it should be ready. For example to push a job and
have it ready for processing in 15 seconds, we would do:
$job = new \Disque\Queue\Job(['name' => 'Mariano']);
$disque->queue('my_queue')->schedule($job, new \DateTime('+15 seconds'));
While testing this feature, you can wait for jobs every second to see it working as expected:
while (true) {
try {
$job = $disque->queue('my_queue')->pull(1000);
} catch (\Disque\Queue\JobNotAvailableException $e) {
echo "[" . date('Y-m-d H:i:s') . "] Waiting...\n";
continue;
}
echo "[" . date('Y-m-d H:i:s') . "] GOT JOB!";
var_dump($job->getBody());
}
Once you have processed a job succesfully, you will need to acknowledge it, to
avoid Disque from putting it back on the queue
(details from Disque itself).
You do so via the processed()
method, like so:
$disque->queue('my_queue')->processed($job);
If you are processing a job that requires a long time to be done, it is good
practice to call processing()
, that way Disque is informed that we are still
working on the job, and avoids it from being requeued under the assumption
that the job could not be processed correctly. For example:
$queue = $disque->queue('my_queue');
$job = $queue->pull();
for ($i=0; $i < 10; $i++) {
// Every 2 seconds inform that we are working on the job
$queue->processing($job);
}
// We are done with the job
$queue->processed($job);
You can choose to have your own Job classes when using the Queue API. To do
so you start by implementing Disque\Queue\JobInterface
, and make the class
take whatever shape you deem necessary. For example:
class EmailJob implements \Disque\Queue\JobInterface
{
private $id;
public $email;
public $subject;
public $message;
public function __construct($email, $subject, $message)
{
$this->email = $email;
$this->subject = $subject;
$this->message = $message ?: 'No message';
}
public function getId()
{
return $this->id;
}
public function setId($id)
{
$this->id = $id;
}
public function send()
{
echo "SEND EMAIL TO {$this->email}:\n";
echo $this->subject . "\n\n";
echo $this->message;
}
}
You then have to create a Marshaler: a way for this job class to be serialized,
and deserialized. Marshalers should implement
Disque\Queue\Marshal\MarshalerInterface
. You would normally use JSON, but
you are not required to. For example to create a marshaler for the above
EmailJob
class we could do:
class EmailJobMarshaler implements \Disque\Queue\Marshal\MarshalerInterface
{
public function unmarshal($source)
{
$body = @json_decode($source, true);
if (is_null($body)) {
throw new \Disque\Queue\Marshal\MarshalException("Could not deserialize {$source}");
} elseif (!is_array($body) || empty($body['email']) || empty($body['subject'])) {
throw new \Disque\Queue\Marshal\MarshalException('Not an email job');
}
$body += ['message' => null];
return new EmailJob($body['email'], $body['subject'], $body['message']);
}
public function marshal(\Disque\Queue\JobInterface $job)
{
if (!($job instanceof EmailJob)) {
throw new \Disque\Queue\Marshal\MarshalException('Not an email job');
}
return json_encode([
'email' => $job->email,
'subject' => $job->subject,
'message' => $job->message
]);
}
}
So as you can see unmarshal()
will take a string, and should return an
instance of Disque\Queue\JobInterface
, or throw a
Disque\Queue\Marshal\MarshalException
if something went wrong. Similarly
marshal()
takes a Disque\Queue\JobInterface
and returns its string
representation, throwing a Disque\Queue\Marshal\MarshalException
if something
went wrong.
To use this marshaler you create an instance of it, and set it via the Queue
setMarshaler()
method:
$queue = $disque->queue('emails');
$queue->setMarshaler(new EmailJobMarshaler());
You can now push jobs to the queue:
$job = new EmailJob('claudia@example.com', 'Hello world!', 'Hello from Disque :)');
$queue->push($job);
echo "JOB #{$job->getId()} pushed!\n";
When pulling jobs from the queue, you can take advantage of your custom job implementation:
while ($job = $queue->pull()) {
echo "Got JOB #{$job->getId()}!\n";
$job->send();
$queue->processed($job);
}
When using the Client API directly (that is, using the commands provided by
\Disque\Client
), you will have to connect manually.. You can connect via
the connect()
method. As recommended by Disque, the connection is done
as follows:
- The list of hosts is used to pick a random server.
- A connection is attempted against the picked server. If it fails, another random node is tried.
- If a connection is successfull, the
HELLO
command is issued against this server. If this fails, another random node is tried. - If no connection is established and there are no servers left, a
Disque\Connection\ConnectionException
is thrown.
Example call:
$client = new \Disque\Client([
'127.0.0.1:7711',
'127.0.0.1:7712'
]);
$result = $client->connect();
var_dump($result);
The above connect()
call will return an output similar to the following:
[
'version' => 1,
'id' => "7eff078744b72d24d9ab71db1fb600c48cf7ec2f",
'nodes' => [
[
'id' => "7eff078744b72d24d9ab71db1fb600c48cf7ec2f",
'host' => "127.0.0.1",
'port' => "7711",
'version' => "1"
],
[
'id' => "d8f6333f5386bae67a216e0365ea09323eadc127",
'host' => "127.0.0.1",
'port' => "7712",
'version' => "1"
],
]
]
By default disque-php does not require any other packages or libraries. It has
its own connector to Disque, that is fast and focused. If you wish to instead
use another connector to handle the connection with Disque, you can specify
so via the setConnectionImplementation()
method. For example, if you wish
to use predis (maybe because you are already
using its PHP extension), you would first add predis to your Composer
requirements:
$ composer require predis/predis --no-dev
And then configure the connection implementation class:
$client->getConnectionManager()->setConnectionClass(\Disque\Connection\Predis::class);
Disque suggests that if a consumer sees a high message rate received from a specific node, then clients should connect to that node directly to reduce the number of messages between nodes. To achieve this, disquephp connection manager has a method that allows you to specify how many jobs are required to be produced by a specific node before we automatically switch connection to that node. For example if we do:
$disque = new \Disque\Client([
'127.0.0.1:7711',
'127.0.0.2:7712'
]);
$disque->connect();
We are currently connected to one of these nodes (no guarantee as to which one
since nodes are selected randomly.) Say that we are connected to the node at
port 7711
. By default no automatic connection change will take place, so we
will always be connected to the selected node. Say that we want to switch to
a node the moment a specific node produces at least 3 jobs. We first set this
option via the manager:
$disque->getManager()->setMinimumJobsToChangeNode(3);
Now we process jobs as we normally do:
while ($job = $disque->getJob()) {
echo 'DO SOMETHING!';
var_dump($job['body']);
$disque->ackJob($job['id']);
}
If a specific node produces at least 3 jobs, the connection will automatically switch to the node producing these many jobs. This is all done behind the scenes, automatically.
Currently all Disque commands are implemented, and can be executed via the
Disque\Client
class. Once you have established a connection, you can run
any of the following commands.
Acknowledges the execution of one or more jobs via job IDs. Signature:
ackJob(string... $ids): int
Arguments:
string... $ids
: Each job ID as an argument
Return value:
int
: The number of jobs acknowledged
Example call:
$jobCount = $client->ackJob('jobid1', 'jobid2');
Adds a job to the specified queue. Signature:
addJob(string $queue, string $payload, array $options = []): string
Arguments:
$queue
: The name of the queue where to create the job. If no queue with that name exists, it will be ceated automatically. Queues are also automatically removed when they hold no pending jobs.$payload
: Payload of the job. This is usually a JSON encoded set of arguments, but you can specify whatever string you want.$options
: Set of options, amongst:timeout
: anint
, which specifies the timeout in milliseconds for the job. See Disque's API.replicate
: anint
, to specify the number of nodes the job should be replicated to.delay
: anint
, to specify the number of seconds that should elapse before the job is queued by any server.retry
: anint
, to specify the period (in seconds) after which, if the job is not acknowledged, the job is put again into the queue for delivery. See Disque's API.ttl
: anint
, which is the maximum job life in seconds.maxlen
: anint
, to specify that if there are already these many jobs queued in the given queue, then this new job is refused.async
: abool
, iftrue
, tells the server to let the command return ASAP and replicate the job to the other nodes in background. See Disque's API.
Return value:
string
: the job ID
Example call:
$jobId = $client->addJob('queue', json_encode(['name' => 'Mariano']));
var_dump($jobId);
Completely delete a job from a specific node. Signature:
delJob(string... $ids): int
Arguments:
string... $ids
: Each job ID as an argument
Return value:
int
: The number of jobs removed
Example call:
$jobCount = $client->delJob('jobid1', 'jobid2');
Remove the given jobs from the queue. Signature:
dequeue(string... $ids): int
Arguments:
string... $ids
: Each job ID as an argument
Return value:
int
: The number of jobs dequeued
Example call:
$jobCount = $client->dequeue('jobid1', 'jobid2');
Queue the given jobs, if not already queued. Signature:
enqueue(string... $ids): int
Arguments:
string... $ids
: Each job ID as an argument
Return value:
int
: The number of jobs enqueued
Example call:
$jobCount = $client->enqueue('jobid1', 'jobid2');
Acknowledges the execution of one or more jobs via job IDs, using a faster
approach than ACKJOB
. See Disque's API
to understand the difference with ACKJOB
and decide when to use which.
Signature:
fastAck(string... $ids): int
Arguments:
string... $ids
: Each job ID as an argument
Return value:
int
: The number of jobs acknowledged
Example call:
$jobCount = $client->fastAck('jobid1', 'jobid2');
Gets a job (or several jobs if the option count
is used) from the specified
queue. Signature:
getJob(string... $queues, array $options = []): array
Arguments:
$queues
: The set of queues from where to fetch jobs.$options
: Set of options, amongst:timeout
: anint
, which specifies the timeout in milliseconds to wait for jobs. If no jobs are available and thistimeout
expired, then no jobs are returned.count
: anint
, to specify the number of jobs you wish to obtain.
Return value:
array
: A set of jobs, where each job is an indexed array with:queue
: astring
, that indicates from which queue this job came from.id
: astring
, which is the job ID.body
: astring
, which is the payload of the job.
Example call:
$jobs = $client->getJob('queue1', 'queue2', [
'timeout' => 3000
]);
if (empty($jobs)) {
die('NO JOBS!');
}
$job = $jobs[0];
echo "QUEUE: {$job['queue']}\n";
echo "ID: {$job['id']}\n";
var_dump(json_decode($job['body'], true));
Returns information from the connected node. You would normally not need to use this, as it is using during the connection handshake. Signature:
hello(): array
Arguments:
- None
Return value:
array
: Indexed array with:version
: astring
, which indicates theHELLO
format version.id
: astring
, which is the ID of the Disque node we are connected to.nodes
: anarray
, which is a set of nodes, and where each node is an indexed array with:id
: astring
, which is the ID of this Disque node.host
: astring
, which is the host where this node is listening.port
: anint
, which is the port where this node is listening.version
: astring
, which indicates theHELLO
format version.
Example call:
$hello = $client->hello();
var_dump($hello);
Get generic server information and statistics. You would normally not need to use this. Signature:
info(): string
Arguments:
- None
Return value:
string
: A big string with information about the connected node.
Example call:
$info = $client->info();
echo $info;
The length of the queue, that is, the number of jobs available in the given queue. Signature:
qlen(string $queue): int
Arguments:
$queue
: Queue from which to get the number of jobs available.
Return value:
int
: Queue length.
Example call:
$count = $client->qlen('queue');
var_dump($hello);
Gets the given number of jobs from the given queue without consuming them (so they will still be pending in the queue). Signature:
qpeek(string $queue, int $count): array
Arguments:
$queue
: The queue from where to look for jobs.count
: anint
, to specify the number of jobs you wish to obtain. If this number is negative, then it will get these number of newest jobs.
Return value:
array
: A set of jobs, where each job is an indexed array with:queue
: astring
, that indicates from which queue this job came from.id
: astring
, which is the job ID.body
: astring
, which is the payload of the job.
Example call:
$jobs = $client->qpeek('queue', 1);
if (empty($jobs)) {
die('NO JOBS!');
}
$job = $jobs[0];
echo "ID: {$job['id']}\n";
var_dump(json_decode($job['body'], true));
Iterate all existing queues on the node that the client is connected to, allowing navigation with a cursor. As specified by Disque this command may return duplicated elements.
qscan(int $cursor = 0, array $options = [])
Arguments:
$cursor
: anint
, which is the cursor we are navigating. On first call this should be0
, when following an already established cursor this should be the cursor returned by the previous call (seenextCursor
).$options
: an array, containing the set of options to influence the scan. Available options:count
: anint
, a hint about how much work to do per iteration.busyloop
: abool
. If set totrue
the call will block and will return all elements in a single iteration.minlen
: anint
. Do not include any queues with less than the given number of jobs queued.maxlen
: anint
. Do not include any queues with more than the given number of jobs queued.importrate
: anint
. Only include queues with a job import rate (from other nodes) higher than or equal to the given number.
Return value:
array
: An indexed array with:finished
: abool
, which tells if this is the last iteration.nextCursor
: anint
, which tells the cursor to use to get the next iteration. If0
then this is the last iteration (which also guarantees thatfinished
is set totrue
).queues
: anarray
, where each element is a queue name.
Example call:
// Get all queues, one queue at a time
$cursor = 0;
do {
$result = $client->qscan($cursor, ['count' => 1]);
var_dump($result['queues']);
$cursor = $result['nextCursor'];
} while (!$result['finished']);
Get information about the given job. Signature:
show(string $id): array
Arguments:
string $id
: job ID
Return value:
array
: An indexed array with information about the job, including (but not limited to)queue
,state
,ttl
,delay
,retry
,body
,nodes-delivered
,nodes-confirmed
. See Disque's API.
Example call:
$details = $client->show('jobid1');
var_dump($details);
Claims to be still working with the specified job, and asks Disque to postpone the next time it will deliver again the job. Signature:
working(string $id): int
Arguments:
string $id
: job ID
Return value:
int
: Number of seconds you (likely) postponed the message visibility for other workers. See Disque's API.
Example call:
$seconds = $client->working('jobid1');
var_dump($seconds);