diff --git a/CHANGELOG.md b/CHANGELOG.md index 79cf86a..bcdfc88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ All Notable changes will be documented in this file. This project adheres to ### Added - Added support for `WORKING`. +- Added `processing()` method to Queue API. ## [1.2.1] - 2015-05-14 diff --git a/README.md b/README.md index 18db878..17a67f8 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,7 @@ instead of using the issue tracker. - [x] Method in `Queue` to schedule future jobs based on DateTime - [x] `QSCAN`, `WORKING` - [ ] Add support for AUTH based connections +- [x] Add support to `WORKING` command on Queue API - [ ] `QSTAT` when they are implemented upstream ## Acknowledgments diff --git a/docs/README.md b/docs/README.md index 01b9d29..8bf761f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -186,6 +186,24 @@ You do so via the `processed()` method, like so: $disque->queue('my_queue')->processed($job); ``` +### Jobs that consume a long time to process + +If you are processing a job that requires a long time to be done, it is good +practice to call `processing()`, that way Disque is informed that we are still +working on the job, and avoids it from being requeued under the assumption +that the job could not be processed correctly. For example: + +```php +$queue = $disque->queue('my_queue'); +$job = $queue->pull(); +for ($i=0; $i < 10; $i++) { + // Every 2 seconds inform that we are working on the job + $queue->processing($job); +} +// We are done with the job +$queue->processed($job); +``` + ## Changing the Job class You can choose to have your own Job classes when using the Queue API. To do diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index 27ed277..5115cce 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -131,6 +131,18 @@ public function pull($timeout = 0) return $job; } + /** + * Marks that a Job is still being processed + * + * @param JobInterface $job Job + * @return int Number of seconds that the job visibility was postponed + */ + public function processing(JobInterface $job) + { + $this->checkConnected(); + return $this->client->working($job->getId()); + } + /** * Acknowledges a Job as properly handled * diff --git a/tests/Queue/QueueTest.php b/tests/Queue/QueueTest.php index f6ae98a..d8e864a 100644 --- a/tests/Queue/QueueTest.php +++ b/tests/Queue/QueueTest.php @@ -389,4 +389,55 @@ public function testScheduleWayInTheFuture() $result = $queue->schedule($job, new DateTime('+25 days', new DateTimeZone(Queue::DEFAULT_JOB_TIMEZONE))); $this->assertSame($job, $result); } + + public function testProcessingConnected() + { + $job = m::mock(JobInterface::class) + ->shouldReceive('getId') + ->with() + ->andReturn('JOB_ID') + ->once() + ->mock(); + + $client = m::mock(Client::class) + ->shouldReceive('isConnected') + ->with() + ->andReturn(true) + ->once() + ->shouldReceive('working') + ->with('JOB_ID') + ->andReturn(3) + ->mock(); + + $q = new Queue($client, 'queue'); + $result = $q->processing($job); + $this->assertSame(3, $result); + } + + public function testWorkingNotConnected() + { + $job = m::mock(JobInterface::class) + ->shouldReceive('getId') + ->with() + ->andReturn('JOB_ID') + ->once() + ->mock(); + + $client = m::mock(Client::class) + ->shouldReceive('isConnected') + ->with() + ->andReturn(false) + ->once() + ->shouldReceive('connect') + ->with() + ->once() + ->shouldReceive('working') + ->with('JOB_ID') + ->andReturn(3) + ->mock(); + + $q = new Queue($client, 'queue'); + $result = $q->processing($job); + $this->assertSame(3, $result); + } } \ No newline at end of file