mirror of
https://github.com/nextcloud/server.git
synced 2026-04-21 06:08:46 -04:00
Merge pull request #39473 from nextcloud/fix/parallel-aware-job
fix(IParallelAwareJob): Check for other reserved jobs before setting new ones as reserved
This commit is contained in:
commit
53392861ff
5 changed files with 111 additions and 71 deletions
|
|
@ -35,6 +35,7 @@ use OCP\AppFramework\Utility\ITimeFactory;
|
|||
use OCP\AutoloadNotAllowedException;
|
||||
use OCP\BackgroundJob\IJob;
|
||||
use OCP\BackgroundJob\IJobList;
|
||||
use OCP\BackgroundJob\IParallelAwareJob;
|
||||
use OCP\DB\Exception;
|
||||
use OCP\DB\QueryBuilder\IQueryBuilder;
|
||||
use OCP\IConfig;
|
||||
|
|
@ -218,19 +219,33 @@ class JobList implements IJobList {
|
|||
$query->andWhere($query->expr()->eq('time_sensitive', $query->createNamedParameter(IJob::TIME_SENSITIVE, IQueryBuilder::PARAM_INT)));
|
||||
}
|
||||
|
||||
$update = $this->connection->getQueryBuilder();
|
||||
$update->update('jobs')
|
||||
->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime()))
|
||||
->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime()))
|
||||
->where($update->expr()->eq('id', $update->createParameter('jobid')))
|
||||
->andWhere($update->expr()->eq('reserved_at', $update->createParameter('reserved_at')))
|
||||
->andWhere($update->expr()->eq('last_checked', $update->createParameter('last_checked')));
|
||||
|
||||
$result = $query->executeQuery();
|
||||
$row = $result->fetch();
|
||||
$result->closeCursor();
|
||||
|
||||
if ($row) {
|
||||
$job = $this->buildJob($row);
|
||||
|
||||
if ($job instanceof IParallelAwareJob && !$job->getAllowParallelRuns() && $this->hasReservedJob(get_class($job))) {
|
||||
$this->logger->debug('Skipping ' . get_class($job) . ' job with ID ' . $job->getId() . ' because another job with the same class is already running', ['app' => 'cron']);
|
||||
|
||||
$update = $this->connection->getQueryBuilder();
|
||||
$update->update('jobs')
|
||||
->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime() + 1))
|
||||
->where($update->expr()->eq('id', $update->createParameter('jobid')));
|
||||
$update->setParameter('jobid', $row['id']);
|
||||
$update->executeStatement();
|
||||
|
||||
return $this->getNext($onlyTimeSensitive);
|
||||
}
|
||||
|
||||
$update = $this->connection->getQueryBuilder();
|
||||
$update->update('jobs')
|
||||
->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime()))
|
||||
->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime()))
|
||||
->where($update->expr()->eq('id', $update->createParameter('jobid')))
|
||||
->andWhere($update->expr()->eq('reserved_at', $update->createParameter('reserved_at')))
|
||||
->andWhere($update->expr()->eq('last_checked', $update->createParameter('last_checked')));
|
||||
$update->setParameter('jobid', $row['id']);
|
||||
$update->setParameter('reserved_at', $row['reserved_at']);
|
||||
$update->setParameter('last_checked', $row['last_checked']);
|
||||
|
|
@ -240,7 +255,6 @@ class JobList implements IJobList {
|
|||
// Background job already executed elsewhere, try again.
|
||||
return $this->getNext($onlyTimeSensitive);
|
||||
}
|
||||
$job = $this->buildJob($row);
|
||||
|
||||
if ($job === null) {
|
||||
// set the last_checked to 12h in the future to not check failing jobs all over again
|
||||
|
|
|
|||
|
|
@ -75,11 +75,6 @@ abstract class Job implements IJob, IParallelAwareJob {
|
|||
$jobList->setLastRun($this);
|
||||
$logger = $this->logger ?? \OCP\Server::get(LoggerInterface::class);
|
||||
|
||||
if (!$this->getAllowParallelRuns() && $jobList->hasReservedJob(get_class($this))) {
|
||||
$logger->debug('Skipping ' . get_class($this) . ' job with ID ' . $this->getId() . ' because another job with the same class is already running', ['app' => 'cron']);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$jobStartTime = $this->time->getTime();
|
||||
$logger->debug('Run ' . get_class($this) . ' job with ID ' . $this->getId(), ['app' => 'cron']);
|
||||
|
|
|
|||
|
|
@ -250,18 +250,64 @@ class JobListTest extends TestCase {
|
|||
|
||||
public function testHasReservedJobs() {
|
||||
$this->clearJobsList();
|
||||
|
||||
$this->timeFactory->expects($this->atLeastOnce())
|
||||
->method('getTime')
|
||||
->willReturn(123456789);
|
||||
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
$this->assertTrue($this->instance->hasReservedJob());
|
||||
$this->assertTrue($this->instance->hasReservedJob(TestJob::class));
|
||||
});
|
||||
$this->instance->add($job);
|
||||
|
||||
$job2 = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
|
||||
$this->instance->add($job, 1);
|
||||
$this->instance->add($job2, 2);
|
||||
|
||||
$this->assertCount(2, iterator_to_array($this->instance->getJobsIterator(null, 10, 0)));
|
||||
|
||||
$this->assertFalse($this->instance->hasReservedJob());
|
||||
$this->assertFalse($this->instance->hasReservedJob(TestJob::class));
|
||||
|
||||
$job->start($this->instance);
|
||||
$job = $this->instance->getNext();
|
||||
$this->assertNotNull($job);
|
||||
$this->assertTrue($this->instance->hasReservedJob());
|
||||
$this->assertTrue($this->instance->hasReservedJob(TestJob::class));
|
||||
$job = $this->instance->getNext();
|
||||
$this->assertNotNull($job);
|
||||
$this->assertTrue($this->instance->hasReservedJob());
|
||||
$this->assertTrue($this->instance->hasReservedJob(TestJob::class));
|
||||
}
|
||||
|
||||
$this->assertTrue($this->ran);
|
||||
public function testHasReservedJobsAndParallelAwareJob() {
|
||||
$this->clearJobsList();
|
||||
|
||||
$this->timeFactory->expects($this->atLeastOnce())
|
||||
->method('getTime')
|
||||
->willReturnCallback(function () use (&$time) {
|
||||
return time();
|
||||
});
|
||||
|
||||
$job = new TestParallelAwareJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
|
||||
$job2 = new TestParallelAwareJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
|
||||
$this->instance->add($job, 1);
|
||||
$this->instance->add($job2, 2);
|
||||
|
||||
$this->assertCount(2, iterator_to_array($this->instance->getJobsIterator(null, 10, 0)));
|
||||
|
||||
$this->assertFalse($this->instance->hasReservedJob());
|
||||
$this->assertFalse($this->instance->hasReservedJob(TestParallelAwareJob::class));
|
||||
|
||||
$job = $this->instance->getNext();
|
||||
$this->assertNotNull($job);
|
||||
$this->assertTrue($this->instance->hasReservedJob());
|
||||
$this->assertTrue($this->instance->hasReservedJob(TestParallelAwareJob::class));
|
||||
$job = $this->instance->getNext();
|
||||
$this->assertNull($job); // Job doesn't allow parallel runs
|
||||
}
|
||||
|
||||
public function markRun() {
|
||||
|
|
|
|||
|
|
@ -61,58 +61,6 @@ class JobTest extends \Test\TestCase {
|
|||
$this->assertCount(1, $jobList->getAll());
|
||||
}
|
||||
|
||||
public function testDisallowParallelRunsWithNoOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(false);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, false);
|
||||
$jobList->setHasReservedJob(TestJob::class, false);
|
||||
$job->start($jobList);
|
||||
$this->assertTrue($this->run);
|
||||
}
|
||||
|
||||
public function testAllowParallelRunsWithNoOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(true);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, false);
|
||||
$jobList->setHasReservedJob(TestJob::class, false);
|
||||
$job->start($jobList);
|
||||
$this->assertTrue($this->run);
|
||||
}
|
||||
|
||||
public function testAllowParallelRunsWithOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(true);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, true);
|
||||
$jobList->setHasReservedJob(TestJob::class, true);
|
||||
$job->start($jobList);
|
||||
$this->assertTrue($this->run);
|
||||
}
|
||||
|
||||
public function testDisallowParallelRunsWithOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(false);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, true);
|
||||
$jobList->setHasReservedJob(TestJob::class, true);
|
||||
$job->start($jobList);
|
||||
$this->assertFalse($this->run);
|
||||
}
|
||||
|
||||
public function markRun() {
|
||||
$this->run = true;
|
||||
}
|
||||
|
|
|
|||
37
tests/lib/BackgroundJob/TestParallelAwareJob.php
Normal file
37
tests/lib/BackgroundJob/TestParallelAwareJob.php
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2014 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace Test\BackgroundJob;
|
||||
|
||||
use OCP\AppFramework\Utility\ITimeFactory;
|
||||
|
||||
class TestParallelAwareJob extends \OCP\BackgroundJob\Job {
|
||||
private $testCase;
|
||||
|
||||
/**
|
||||
* @var callable $callback
|
||||
*/
|
||||
private $callback;
|
||||
|
||||
/**
|
||||
* @param JobTest $testCase
|
||||
* @param callable $callback
|
||||
*/
|
||||
public function __construct(ITimeFactory $time = null, $testCase = null, $callback = null) {
|
||||
parent::__construct($time ?? \OC::$server->get(ITimeFactory::class));
|
||||
$this->setAllowParallelRuns(false);
|
||||
$this->testCase = $testCase;
|
||||
$this->callback = $callback;
|
||||
}
|
||||
|
||||
public function run($argument) {
|
||||
$this->testCase->markRun();
|
||||
$callback = $this->callback;
|
||||
$callback($argument);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue