From 118ca6a94b161c237eaa205dd93568aa44fb2763 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Tue, 17 Mar 2026 18:21:23 +0000
Subject: [PATCH] feat(taskprocessing): Add worker command with tests and
registration
Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com>
---
core/Command/TaskProcessing/WorkerCommand.php | 150 ++++++++++
core/register_command.php | 2 +
.../TaskProcessing/WorkerCommandTest.php | 264 ++++++++++++++++++
3 files changed, 416 insertions(+)
create mode 100644 core/Command/TaskProcessing/WorkerCommand.php
create mode 100644 tests/Core/Command/TaskProcessing/WorkerCommandTest.php
diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php
new file mode 100644
index 00000000000..29d35be7975
--- /dev/null
+++ b/core/Command/TaskProcessing/WorkerCommand.php
@@ -0,0 +1,150 @@
+setName('taskprocessing:worker')
+ ->setDescription('Run a dedicated worker for synchronous TaskProcessing providers')
+ ->addOption(
+ 'timeout',
+ 't',
+ InputOption::VALUE_OPTIONAL,
+ 'Duration in seconds after which the worker exits (0 = run indefinitely)',
+ 0
+ )
+ ->addOption(
+ 'interval',
+ 'i',
+ InputOption::VALUE_OPTIONAL,
+ 'Sleep duration in seconds between polling iterations when no task was processed',
+ 1
+ )
+ ->addOption(
+ 'once',
+ null,
+ InputOption::VALUE_NONE,
+ 'Process at most one task then exit'
+ );
+ parent::configure();
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int {
+ $startTime = time();
+ $timeout = (int)$input->getOption('timeout');
+ $interval = (int)$input->getOption('interval');
+ $once = $input->getOption('once') === true;
+
+ if ($timeout > 0) {
+ $output->writeln('Task processing worker will stop after ' . $timeout . ' seconds');
+ }
+
+ while (true) {
+ // Stop if timeout exceeded
+ if ($timeout > 0 && ($startTime + $timeout) < time()) {
+ $output->writeln('Timeout reached, exiting...', OutputInterface::VERBOSITY_VERBOSE);
+ break;
+ }
+
+ // Handle SIGTERM/SIGINT gracefully
+ try {
+ $this->abortIfInterrupted();
+ } catch (InterruptedException $e) {
+ $output->writeln('Task processing worker stopped');
+ break;
+ }
+
+ $processedTask = $this->processNextTask($output);
+
+ if ($once) {
+ break;
+ }
+
+ if (!$processedTask) {
+ $output->writeln('No task processed, waiting ' . $interval . ' second(s)...', OutputInterface::VERBOSITY_VERBOSE);
+ sleep($interval);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Attempt to process one task across all preferred synchronous providers.
+ *
+ * @return bool True if a task was processed, false if no task was found
+ */
+ private function processNextTask(OutputInterface $output): bool {
+ $providers = $this->taskProcessingManager->getProviders();
+
+ foreach ($providers as $provider) {
+ if (!$provider instanceof ISynchronousProvider) {
+ continue;
+ }
+
+ $taskTypeId = $provider->getTaskTypeId();
+
+ // Only use this provider if it is the preferred one for the task type
+ try {
+ $preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId);
+ } catch (Exception $e) {
+ $this->logger->error('Failed to get preferred provider for task type ' . $taskTypeId, ['exception' => $e]);
+ continue;
+ }
+
+ if ($provider->getId() !== $preferredProvider->getId()) {
+ continue;
+ }
+
+ try {
+ $task = $this->taskProcessingManager->getNextScheduledTask([$taskTypeId]);
+ } catch (NotFoundException) {
+ continue;
+ } catch (Exception $e) {
+ $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
+ continue;
+ }
+
+ $output->writeln(
+ 'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(),
+ OutputInterface::VERBOSITY_VERBOSE
+ );
+
+ $this->taskProcessingManager->processTask($task, $provider);
+
+ $output->writeln(
+ 'Finished processing task ' . $task->getId(),
+ OutputInterface::VERBOSITY_VERBOSE
+ );
+
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/core/register_command.php b/core/register_command.php
index 58aed05ba68..d28c1633c62 100644
--- a/core/register_command.php
+++ b/core/register_command.php
@@ -91,6 +91,7 @@ use OC\Core\Command\SystemTag\Edit;
use OC\Core\Command\TaskProcessing\EnabledCommand;
use OC\Core\Command\TaskProcessing\GetCommand;
use OC\Core\Command\TaskProcessing\Statistics;
+use OC\Core\Command\TaskProcessing\WorkerCommand;
use OC\Core\Command\TwoFactorAuth\Cleanup;
use OC\Core\Command\TwoFactorAuth\Enforce;
use OC\Core\Command\TwoFactorAuth\State;
@@ -255,6 +256,7 @@ if ($config->getSystemValueBool('installed', false)) {
$application->add(Server::get(Command\TaskProcessing\ListCommand::class));
$application->add(Server::get(Statistics::class));
$application->add(Server::get(Command\TaskProcessing\Cleanup::class));
+ $application->add(Server::get(WorkerCommand::class));
$application->add(Server::get(RedisCommand::class));
$application->add(Server::get(DistributedClear::class));
diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php
new file mode 100644
index 00000000000..71ebdee0dbc
--- /dev/null
+++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php
@@ -0,0 +1,264 @@
+manager = $this->createMock(IManager::class);
+ $this->logger = $this->createMock(LoggerInterface::class);
+ $this->command = new WorkerCommand($this->manager, $this->logger);
+ }
+
+ /**
+ * Helper to create a minimal ISynchronousProvider mock.
+ */
+ private function createProvider(string $id, string $taskTypeId): ISynchronousProvider&MockObject {
+ $provider = $this->createMock(ISynchronousProvider::class);
+ $provider->method('getId')->willReturn($id);
+ $provider->method('getTaskTypeId')->willReturn($taskTypeId);
+ return $provider;
+ }
+
+ /**
+ * Helper to create a Task mock with an id.
+ */
+ private function createTask(int $id): Task&MockObject {
+ $task = $this->createMock(Task::class);
+ $task->method('getId')->willReturn($id);
+ return $task;
+ }
+
+ public function testOnceExitsAfterNoTask(): void {
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([]);
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+
+ public function testOnceProcessesOneTask(): void {
+ $taskTypeId = 'test_task_type';
+ $provider = $this->createProvider('test_provider', $taskTypeId);
+ $task = $this->createTask(42);
+
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([$provider]);
+
+ $this->manager->expects($this->once())
+ ->method('getPreferredProvider')
+ ->with($taskTypeId)
+ ->willReturn($provider);
+
+ $this->manager->expects($this->once())
+ ->method('getNextScheduledTask')
+ ->with([$taskTypeId])
+ ->willReturn($task);
+
+ $this->manager->expects($this->once())
+ ->method('processTask')
+ ->with($task, $provider)
+ ->willReturn(true);
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+
+ public function testSkipsNonSynchronousProviders(): void {
+ // A provider that is NOT an ISynchronousProvider
+ $nonSyncProvider = $this->createMock(\OCP\TaskProcessing\IProvider::class);
+ $nonSyncProvider->method('getId')->willReturn('non_sync_provider');
+ $nonSyncProvider->method('getTaskTypeId')->willReturn('some_type');
+
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([$nonSyncProvider]);
+
+ $this->manager->expects($this->never())
+ ->method('getPreferredProvider');
+
+ $this->manager->expects($this->never())
+ ->method('getNextScheduledTask');
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+
+ public function testSkipsNonPreferredProviders(): void {
+ $taskTypeId = 'test_task_type';
+ $provider = $this->createProvider('provider_a', $taskTypeId);
+ $preferredProvider = $this->createProvider('provider_b', $taskTypeId);
+
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([$provider]);
+
+ $this->manager->expects($this->once())
+ ->method('getPreferredProvider')
+ ->with($taskTypeId)
+ ->willReturn($preferredProvider);
+
+ // provider_a is not preferred (provider_b is), so getNextScheduledTask is never called
+ $this->manager->expects($this->never())
+ ->method('getNextScheduledTask');
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+
+ public function testContinuesWhenNoTaskFound(): void {
+ $taskTypeId = 'test_task_type';
+ $provider = $this->createProvider('test_provider', $taskTypeId);
+
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([$provider]);
+
+ $this->manager->expects($this->once())
+ ->method('getPreferredProvider')
+ ->with($taskTypeId)
+ ->willReturn($provider);
+
+ $this->manager->expects($this->once())
+ ->method('getNextScheduledTask')
+ ->with([$taskTypeId])
+ ->willThrowException(new NotFoundException());
+
+ $this->manager->expects($this->never())
+ ->method('processTask');
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+
+ public function testLogsErrorAndContinuesOnException(): void {
+ $taskTypeId = 'test_task_type';
+ $provider = $this->createProvider('test_provider', $taskTypeId);
+
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([$provider]);
+
+ $this->manager->expects($this->once())
+ ->method('getPreferredProvider')
+ ->with($taskTypeId)
+ ->willReturn($provider);
+
+ $exception = new Exception('DB error');
+ $this->manager->expects($this->once())
+ ->method('getNextScheduledTask')
+ ->with([$taskTypeId])
+ ->willThrowException($exception);
+
+ $this->logger->expects($this->once())
+ ->method('error')
+ ->with('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $exception]);
+
+ $this->manager->expects($this->never())
+ ->method('processTask');
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+
+ public function testTimeoutExitsLoop(): void {
+ // Arrange: no providers so each iteration does nothing, but timeout=1 should exit quickly
+ $this->manager->method('getProviders')->willReturn([]);
+
+ $input = new ArrayInput(['--timeout' => '1', '--interval' => '0'], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $start = time();
+ $result = $this->command->run($input, $output);
+ $elapsed = time() - $start;
+
+ $this->assertSame(0, $result);
+ // Should have exited within a few seconds
+ $this->assertLessThanOrEqual(5, $elapsed);
+ }
+
+ public function testProcessesFirstMatchingProvider(): void {
+ $taskTypeId1 = 'type_a';
+ $taskTypeId2 = 'type_b';
+
+ $provider1 = $this->createProvider('provider_a', $taskTypeId1);
+ $provider2 = $this->createProvider('provider_b', $taskTypeId2);
+ $task = $this->createTask(7);
+
+ $this->manager->expects($this->once())
+ ->method('getProviders')
+ ->willReturn([$provider1, $provider2]);
+
+ $this->manager->expects($this->once())
+ ->method('getPreferredProvider')
+ ->with($taskTypeId1)
+ ->willReturn($provider1);
+
+ $this->manager->expects($this->once())
+ ->method('getNextScheduledTask')
+ ->with([$taskTypeId1])
+ ->willReturn($task);
+
+ $this->manager->expects($this->once())
+ ->method('processTask')
+ ->with($task, $provider1)
+ ->willReturn(true);
+
+ $input = new ArrayInput(['--once' => true], $this->command->getDefinition());
+ $output = new NullOutput();
+
+ $result = $this->command->run($input, $output);
+
+ $this->assertSame(0, $result);
+ }
+}