mirror of
https://github.com/nextcloud/server.git
synced 2026-06-11 01:30:50 -04:00
Merge pull request #37835 from nextcloud/feat/background-allow-parallel-runs
feat(BackgroundJobs): Allow preventing parallel runs for a job class
This commit is contained in:
commit
7250f54ca3
11 changed files with 217 additions and 13 deletions
|
|
@ -120,6 +120,7 @@ return array(
|
|||
'OCP\\AutoloadNotAllowedException' => $baseDir . '/lib/public/AutoloadNotAllowedException.php',
|
||||
'OCP\\BackgroundJob\\IJob' => $baseDir . '/lib/public/BackgroundJob/IJob.php',
|
||||
'OCP\\BackgroundJob\\IJobList' => $baseDir . '/lib/public/BackgroundJob/IJobList.php',
|
||||
'OCP\\BackgroundJob\\IParallelAwareJob' => $baseDir . '/lib/public/BackgroundJob/IParallelAwareJob.php',
|
||||
'OCP\\BackgroundJob\\Job' => $baseDir . '/lib/public/BackgroundJob/Job.php',
|
||||
'OCP\\BackgroundJob\\QueuedJob' => $baseDir . '/lib/public/BackgroundJob/QueuedJob.php',
|
||||
'OCP\\BackgroundJob\\TimedJob' => $baseDir . '/lib/public/BackgroundJob/TimedJob.php',
|
||||
|
|
|
|||
|
|
@ -153,6 +153,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
|
|||
'OCP\\AutoloadNotAllowedException' => __DIR__ . '/../../..' . '/lib/public/AutoloadNotAllowedException.php',
|
||||
'OCP\\BackgroundJob\\IJob' => __DIR__ . '/../../..' . '/lib/public/BackgroundJob/IJob.php',
|
||||
'OCP\\BackgroundJob\\IJobList' => __DIR__ . '/../../..' . '/lib/public/BackgroundJob/IJobList.php',
|
||||
'OCP\\BackgroundJob\\IParallelAwareJob' => __DIR__ . '/../../..' . '/lib/public/BackgroundJob/IParallelAwareJob.php',
|
||||
'OCP\\BackgroundJob\\Job' => __DIR__ . '/../../..' . '/lib/public/BackgroundJob/Job.php',
|
||||
'OCP\\BackgroundJob\\QueuedJob' => __DIR__ . '/../../..' . '/lib/public/BackgroundJob/QueuedJob.php',
|
||||
'OCP\\BackgroundJob\\TimedJob' => __DIR__ . '/../../..' . '/lib/public/BackgroundJob/TimedJob.php',
|
||||
|
|
|
|||
|
|
@ -35,19 +35,23 @@ use OCP\AppFramework\Utility\ITimeFactory;
|
|||
use OCP\AutoloadNotAllowedException;
|
||||
use OCP\BackgroundJob\IJob;
|
||||
use OCP\BackgroundJob\IJobList;
|
||||
use OCP\DB\Exception;
|
||||
use OCP\DB\QueryBuilder\IQueryBuilder;
|
||||
use OCP\IConfig;
|
||||
use OCP\IDBConnection;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
class JobList implements IJobList {
|
||||
protected IDBConnection $connection;
|
||||
protected IConfig $config;
|
||||
protected ITimeFactory $timeFactory;
|
||||
protected LoggerInterface $logger;
|
||||
|
||||
public function __construct(IDBConnection $connection, IConfig $config, ITimeFactory $timeFactory) {
|
||||
public function __construct(IDBConnection $connection, IConfig $config, ITimeFactory $timeFactory, LoggerInterface $logger) {
|
||||
$this->connection = $connection;
|
||||
$this->config = $config;
|
||||
$this->timeFactory = $timeFactory;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -382,4 +386,26 @@ class JobList implements IJobList {
|
|||
->where($query->expr()->eq('id', $query->createNamedParameter($job->getId()), IQueryBuilder::PARAM_INT));
|
||||
$query->executeStatement();
|
||||
}
|
||||
|
||||
public function hasReservedJob(?string $className = null): bool {
|
||||
$query = $this->connection->getQueryBuilder();
|
||||
$query->select('*')
|
||||
->from('jobs')
|
||||
->where($query->expr()->neq('reserved_at', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT)))
|
||||
->setMaxResults(1);
|
||||
|
||||
if ($className !== null) {
|
||||
$query->andWhere($query->expr()->eq('class', $query->createNamedParameter($className)));
|
||||
}
|
||||
|
||||
try {
|
||||
$result = $query->executeQuery();
|
||||
$hasReservedJobs = $result->fetch() !== false;
|
||||
$result->closeCursor();
|
||||
return $hasReservedJobs;
|
||||
} catch (Exception $e) {
|
||||
$this->logger->debug('Querying reserved jobs failed', ['exception' => $e]);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ class TranscriptionJob extends QueuedJob {
|
|||
private LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct($timeFactory);
|
||||
$this->setAllowParallelRuns(false);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -145,4 +145,13 @@ interface IJobList {
|
|||
* @since 23.0.0
|
||||
*/
|
||||
public function resetBackgroundJob(IJob $job): void;
|
||||
|
||||
/**
|
||||
* Checks whether a job of the passed class is reserved to run
|
||||
*
|
||||
* @param string|null $className
|
||||
* @return bool
|
||||
* @since 27.0.0
|
||||
*/
|
||||
public function hasReservedJob(?string $className): bool;
|
||||
}
|
||||
|
|
|
|||
47
lib/public/BackgroundJob/IParallelAwareJob.php
Normal file
47
lib/public/BackgroundJob/IParallelAwareJob.php
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
|
||||
/**
|
||||
* @copyright Copyright (c) 2023, Marcel Klehr <mklehr@gmx.net>
|
||||
*
|
||||
* @author Marcel Klehr <mklehr@gmx.net>
|
||||
*
|
||||
* @license GNU AGPL version 3 or any later version
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
namespace OCP\BackgroundJob;
|
||||
|
||||
/**
|
||||
* @since 27.0.0
|
||||
*/
|
||||
interface IParallelAwareJob {
|
||||
/**
|
||||
* Set this to false to prevent two Jobs from the same class from running in parallel
|
||||
*
|
||||
* @param bool $allow
|
||||
* @return void
|
||||
* @since 27.0.0
|
||||
*/
|
||||
public function setAllowParallelRuns(bool $allow): void;
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
* @since 27.0.0
|
||||
*/
|
||||
public function getAllowParallelRuns(): bool;
|
||||
}
|
||||
|
|
@ -38,11 +38,13 @@ use Psr\Log\LoggerInterface;
|
|||
*
|
||||
* @since 15.0.0
|
||||
*/
|
||||
abstract class Job implements IJob {
|
||||
abstract class Job implements IJob, IParallelAwareJob {
|
||||
protected int $id = 0;
|
||||
protected int $lastRun = 0;
|
||||
protected $argument;
|
||||
protected ITimeFactory $time;
|
||||
protected bool $allowParallelRuns = true;
|
||||
private ?ILogger $logger = null;
|
||||
|
||||
/**
|
||||
* @since 15.0.0
|
||||
|
|
@ -61,6 +63,7 @@ abstract class Job implements IJob {
|
|||
* @since 15.0.0
|
||||
*/
|
||||
public function execute(IJobList $jobList, ILogger $logger = null) {
|
||||
$this->logger = $logger;
|
||||
$this->start($jobList);
|
||||
}
|
||||
|
||||
|
|
@ -70,7 +73,12 @@ abstract class Job implements IJob {
|
|||
*/
|
||||
public function start(IJobList $jobList): void {
|
||||
$jobList->setLastRun($this);
|
||||
$logger = \OCP\Server::get(LoggerInterface::class);
|
||||
$logger = $this->logger ?? \OCP\Server::get(LoggerInterface::class);
|
||||
|
||||
if (!$this->getAllowParallelRuns() && $jobList->hasReservedJob(get_class($this))) {
|
||||
$logger->debug('Skipping ' . get_class($this) . ' job with ID ' . $this->getId() . ' because another job with the same class is already running', ['app' => 'cron']);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$jobStartTime = $this->time->getTime();
|
||||
|
|
@ -80,7 +88,7 @@ abstract class Job implements IJob {
|
|||
|
||||
$logger->debug('Finished ' . get_class($this) . ' job with ID ' . $this->getId() . ' in ' . $timeTaken . ' seconds', ['app' => 'cron']);
|
||||
$jobList->setExecutionTime($this, $timeTaken);
|
||||
} catch (\Exception $e) {
|
||||
} catch (\Throwable $e) {
|
||||
if ($logger) {
|
||||
$logger->error('Error while running background job (class: ' . get_class($this) . ', arguments: ' . print_r($this->argument, true) . ')', [
|
||||
'app' => 'core',
|
||||
|
|
@ -132,6 +140,25 @@ abstract class Job implements IJob {
|
|||
return $this->argument;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set this to false to prevent two Jobs from this class from running in parallel
|
||||
*
|
||||
* @param bool $allow
|
||||
* @return void
|
||||
* @since 27.0.0
|
||||
*/
|
||||
public function setAllowParallelRuns(bool $allow): void {
|
||||
$this->allowParallelRuns = $allow;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
* @since 27.0.0
|
||||
*/
|
||||
public function getAllowParallelRuns(): bool {
|
||||
return $this->allowParallelRuns;
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual function that is called to run the job
|
||||
*
|
||||
|
|
|
|||
|
|
@ -21,6 +21,11 @@ class DummyJobList extends \OC\BackgroundJob\JobList {
|
|||
*/
|
||||
private array $jobs = [];
|
||||
|
||||
/**
|
||||
* @var bool[]
|
||||
*/
|
||||
private array $reserved = [];
|
||||
|
||||
private int $last = 0;
|
||||
|
||||
public function __construct() {
|
||||
|
|
@ -135,6 +140,14 @@ class DummyJobList extends \OC\BackgroundJob\JobList {
|
|||
$job->setLastRun(time());
|
||||
}
|
||||
|
||||
public function hasReservedJob(?string $className = null): bool {
|
||||
return $this->reserved[$className ?? ''];
|
||||
}
|
||||
|
||||
public function setHasReservedJob(?string $className, bool $hasReserved): void {
|
||||
$this->reserved[$className ?? ''] = $hasReserved;
|
||||
}
|
||||
|
||||
public function setExecutionTime(IJob $job, $timeTaken): void {
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ use OCP\AppFramework\Utility\ITimeFactory;
|
|||
use OCP\BackgroundJob\IJob;
|
||||
use OCP\DB\QueryBuilder\IQueryBuilder;
|
||||
use OCP\IConfig;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Test\TestCase;
|
||||
|
||||
/**
|
||||
|
|
@ -32,6 +33,7 @@ class JobListTest extends TestCase {
|
|||
|
||||
/** @var \OCP\AppFramework\Utility\ITimeFactory|\PHPUnit\Framework\MockObject\MockObject */
|
||||
protected $timeFactory;
|
||||
private bool $ran = false;
|
||||
|
||||
protected function setUp(): void {
|
||||
parent::setUp();
|
||||
|
|
@ -43,7 +45,8 @@ class JobListTest extends TestCase {
|
|||
$this->instance = new \OC\BackgroundJob\JobList(
|
||||
$this->connection,
|
||||
$this->config,
|
||||
$this->timeFactory
|
||||
$this->timeFactory,
|
||||
\OC::$server->get(LoggerInterface::class),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -244,4 +247,24 @@ class JobListTest extends TestCase {
|
|||
$this->assertGreaterThanOrEqual($timeStart, $addedJob->getLastRun());
|
||||
$this->assertLessThanOrEqual($timeEnd, $addedJob->getLastRun());
|
||||
}
|
||||
|
||||
public function testHasReservedJobs() {
|
||||
$this->clearJobsList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
$this->assertTrue($this->instance->hasReservedJob());
|
||||
$this->assertTrue($this->instance->hasReservedJob(TestJob::class));
|
||||
});
|
||||
$this->instance->add($job);
|
||||
|
||||
$this->assertFalse($this->instance->hasReservedJob());
|
||||
$this->assertFalse($this->instance->hasReservedJob(TestJob::class));
|
||||
|
||||
$job->start($this->instance);
|
||||
|
||||
$this->assertTrue($this->ran);
|
||||
}
|
||||
|
||||
public function markRun() {
|
||||
$this->ran = true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,20 +8,23 @@
|
|||
|
||||
namespace Test\BackgroundJob;
|
||||
|
||||
use OCP\AppFramework\Utility\ITimeFactory;
|
||||
use OCP\ILogger;
|
||||
|
||||
class JobTest extends \Test\TestCase {
|
||||
private $run = false;
|
||||
private ITimeFactory $timeFactory;
|
||||
|
||||
protected function setUp(): void {
|
||||
parent::setUp();
|
||||
$this->run = false;
|
||||
$this->timeFactory = \OC::$server->get(ITimeFactory::class);
|
||||
}
|
||||
|
||||
public function testRemoveAfterException() {
|
||||
$jobList = new DummyJobList();
|
||||
$e = new \Exception();
|
||||
$job = new TestJob($this, function () use ($e) {
|
||||
$job = new TestJob($this->timeFactory, $this, function () use ($e) {
|
||||
throw $e;
|
||||
});
|
||||
$jobList->add($job);
|
||||
|
|
@ -30,8 +33,7 @@ class JobTest extends \Test\TestCase {
|
|||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$logger->expects($this->once())
|
||||
->method('logException')
|
||||
->with($e);
|
||||
->method('error');
|
||||
|
||||
$this->assertCount(1, $jobList->getAll());
|
||||
$job->execute($jobList, $logger);
|
||||
|
|
@ -41,7 +43,7 @@ class JobTest extends \Test\TestCase {
|
|||
|
||||
public function testRemoveAfterError() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this, function () {
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
$test = null;
|
||||
$test->someMethod();
|
||||
});
|
||||
|
|
@ -51,8 +53,7 @@ class JobTest extends \Test\TestCase {
|
|||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$logger->expects($this->once())
|
||||
->method('logException')
|
||||
->with($this->isInstanceOf(\Throwable::class));
|
||||
->method('error');
|
||||
|
||||
$this->assertCount(1, $jobList->getAll());
|
||||
$job->execute($jobList, $logger);
|
||||
|
|
@ -60,6 +61,58 @@ class JobTest extends \Test\TestCase {
|
|||
$this->assertCount(1, $jobList->getAll());
|
||||
}
|
||||
|
||||
public function testDisallowParallelRunsWithNoOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(false);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, false);
|
||||
$jobList->setHasReservedJob(TestJob::class, false);
|
||||
$job->start($jobList);
|
||||
$this->assertTrue($this->run);
|
||||
}
|
||||
|
||||
public function testAllowParallelRunsWithNoOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(true);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, false);
|
||||
$jobList->setHasReservedJob(TestJob::class, false);
|
||||
$job->start($jobList);
|
||||
$this->assertTrue($this->run);
|
||||
}
|
||||
|
||||
public function testAllowParallelRunsWithOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(true);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, true);
|
||||
$jobList->setHasReservedJob(TestJob::class, true);
|
||||
$job->start($jobList);
|
||||
$this->assertTrue($this->run);
|
||||
}
|
||||
|
||||
public function testDisallowParallelRunsWithOtherJobs() {
|
||||
$jobList = new DummyJobList();
|
||||
$job = new TestJob($this->timeFactory, $this, function () {
|
||||
});
|
||||
$job->setAllowParallelRuns(false);
|
||||
$jobList->add($job);
|
||||
|
||||
$jobList->setHasReservedJob(null, true);
|
||||
$jobList->setHasReservedJob(TestJob::class, true);
|
||||
$job->start($jobList);
|
||||
$this->assertFalse($this->run);
|
||||
}
|
||||
|
||||
public function markRun() {
|
||||
$this->run = true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,9 @@
|
|||
|
||||
namespace Test\BackgroundJob;
|
||||
|
||||
class TestJob extends \OC\BackgroundJob\Job {
|
||||
use OCP\AppFramework\Utility\ITimeFactory;
|
||||
|
||||
class TestJob extends \OCP\BackgroundJob\Job {
|
||||
private $testCase;
|
||||
|
||||
/**
|
||||
|
|
@ -20,7 +22,8 @@ class TestJob extends \OC\BackgroundJob\Job {
|
|||
* @param JobTest $testCase
|
||||
* @param callable $callback
|
||||
*/
|
||||
public function __construct($testCase = null, $callback = null) {
|
||||
public function __construct(ITimeFactory $time = null, $testCase = null, $callback = null) {
|
||||
parent::__construct($time ?? \OC::$server->get(ITimeFactory::class));
|
||||
$this->testCase = $testCase;
|
||||
$this->callback = $callback;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue