Skip to content

Commit

Permalink
Adding working() method to Queue API
Browse files Browse the repository at this point in the history
  • Loading branch information
mariano committed May 17, 2015
1 parent 668ab93 commit 8f8cb5a
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@ All Notable changes will be documented in this file. This project adheres to

### Added
- Added support for `WORKING`.
- Added `processing()` method to Queue API.

## [1.2.1] - 2015-05-14

Expand Down
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -104,6 +104,7 @@ instead of using the issue tracker.
- [x] Method in `Queue` to schedule future jobs based on DateTime
- [x] `QSCAN`, `WORKING`
- [ ] Add support for AUTH based connections
- [x] Add support to `WORKING` command on Queue API
- [ ] `QSTAT` when they are implemented upstream

## Acknowledgments
Expand Down
18 changes: 18 additions & 0 deletions docs/README.md
Expand Up @@ -186,6 +186,24 @@ You do so via the `processed()` method, like so:
$disque->queue('my_queue')->processed($job);
```

### Jobs that consume a long time to process

If you are processing a job that requires a long time to be done, it is good
practice to call `processing()`, that way Disque is informed that we are still
working on the job, and avoids it from being requeued under the assumption
that the job could not be processed correctly. For example:

```php
$queue = $disque->queue('my_queue');
$job = $queue->pull();
for ($i=0; $i < 10; $i++) {
// Every 2 seconds inform that we are working on the job
$queue->processing($job);
}
// We are done with the job
$queue->processed($job);
```

## Changing the Job class

You can choose to have your own Job classes when using the Queue API. To do
Expand Down
12 changes: 12 additions & 0 deletions src/Queue/Queue.php
Expand Up @@ -131,6 +131,18 @@ public function pull($timeout = 0)
return $job;
}

/**
* Marks that a Job is still being processed
*
* @param JobInterface $job Job
* @return int Number of seconds that the job visibility was postponed
*/
public function processing(JobInterface $job)
{
$this->checkConnected();
return $this->client->working($job->getId());
}

/**
* Acknowledges a Job as properly handled
*
Expand Down
51 changes: 51 additions & 0 deletions tests/Queue/QueueTest.php
Expand Up @@ -389,4 +389,55 @@ public function testScheduleWayInTheFuture()
$result = $queue->schedule($job, new DateTime('+25 days', new DateTimeZone(Queue::DEFAULT_JOB_TIMEZONE)));
$this->assertSame($job, $result);
}

public function testProcessingConnected()
{
$job = m::mock(JobInterface::class)
->shouldReceive('getId')
->with()
->andReturn('JOB_ID')
->once()
->mock();

$client = m::mock(Client::class)
->shouldReceive('isConnected')
->with()
->andReturn(true)
->once()
->shouldReceive('working')
->with('JOB_ID')
->andReturn(3)
->mock();

$q = new Queue($client, 'queue');
$result = $q->processing($job);
$this->assertSame(3, $result);
}

public function testWorkingNotConnected()
{
$job = m::mock(JobInterface::class)
->shouldReceive('getId')
->with()
->andReturn('JOB_ID')
->once()
->mock();

$client = m::mock(Client::class)
->shouldReceive('isConnected')
->with()
->andReturn(false)
->once()
->shouldReceive('connect')
->with()
->once()
->shouldReceive('working')
->with('JOB_ID')
->andReturn(3)
->mock();

$q = new Queue($client, 'queue');
$result = $q->processing($job);
$this->assertSame(3, $result);
}
}

0 comments on commit 8f8cb5a

Please sign in to comment.