diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php new file mode 100644 index 00000000000..29d35be7975 --- /dev/null +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -0,0 +1,150 @@ +setName('taskprocessing:worker') + ->setDescription('Run a dedicated worker for synchronous TaskProcessing providers') + ->addOption( + 'timeout', + 't', + InputOption::VALUE_OPTIONAL, + 'Duration in seconds after which the worker exits (0 = run indefinitely)', + 0 + ) + ->addOption( + 'interval', + 'i', + InputOption::VALUE_OPTIONAL, + 'Sleep duration in seconds between polling iterations when no task was processed', + 1 + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Process at most one task then exit' + ); + parent::configure(); + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + $startTime = time(); + $timeout = (int)$input->getOption('timeout'); + $interval = (int)$input->getOption('interval'); + $once = $input->getOption('once') === true; + + if ($timeout > 0) { + $output->writeln('Task processing worker will stop after ' . $timeout . ' seconds'); + } + + while (true) { + // Stop if timeout exceeded + if ($timeout > 0 && ($startTime + $timeout) < time()) { + $output->writeln('Timeout reached, exiting...', OutputInterface::VERBOSITY_VERBOSE); + break; + } + + // Handle SIGTERM/SIGINT gracefully + try { + $this->abortIfInterrupted(); + } catch (InterruptedException $e) { + $output->writeln('Task processing worker stopped'); + break; + } + + $processedTask = $this->processNextTask($output); + + if ($once) { + break; + } + + if (!$processedTask) { + $output->writeln('No task processed, waiting ' . $interval . ' second(s)...', OutputInterface::VERBOSITY_VERBOSE); + sleep($interval); + } + } + + return 0; + } + + /** + * Attempt to process one task across all preferred synchronous providers. + * + * @return bool True if a task was processed, false if no task was found + */ + private function processNextTask(OutputInterface $output): bool { + $providers = $this->taskProcessingManager->getProviders(); + + foreach ($providers as $provider) { + if (!$provider instanceof ISynchronousProvider) { + continue; + } + + $taskTypeId = $provider->getTaskTypeId(); + + // Only use this provider if it is the preferred one for the task type + try { + $preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId); + } catch (Exception $e) { + $this->logger->error('Failed to get preferred provider for task type ' . $taskTypeId, ['exception' => $e]); + continue; + } + + if ($provider->getId() !== $preferredProvider->getId()) { + continue; + } + + try { + $task = $this->taskProcessingManager->getNextScheduledTask([$taskTypeId]); + } catch (NotFoundException) { + continue; + } catch (Exception $e) { + $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); + continue; + } + + $output->writeln( + 'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(), + OutputInterface::VERBOSITY_VERBOSE + ); + + $this->taskProcessingManager->processTask($task, $provider); + + $output->writeln( + 'Finished processing task ' . $task->getId(), + OutputInterface::VERBOSITY_VERBOSE + ); + + return true; + } + + return false; + } +} diff --git a/core/register_command.php b/core/register_command.php index 58aed05ba68..d28c1633c62 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -91,6 +91,7 @@ use OC\Core\Command\SystemTag\Edit; use OC\Core\Command\TaskProcessing\EnabledCommand; use OC\Core\Command\TaskProcessing\GetCommand; use OC\Core\Command\TaskProcessing\Statistics; +use OC\Core\Command\TaskProcessing\WorkerCommand; use OC\Core\Command\TwoFactorAuth\Cleanup; use OC\Core\Command\TwoFactorAuth\Enforce; use OC\Core\Command\TwoFactorAuth\State; @@ -255,6 +256,7 @@ if ($config->getSystemValueBool('installed', false)) { $application->add(Server::get(Command\TaskProcessing\ListCommand::class)); $application->add(Server::get(Statistics::class)); $application->add(Server::get(Command\TaskProcessing\Cleanup::class)); + $application->add(Server::get(WorkerCommand::class)); $application->add(Server::get(RedisCommand::class)); $application->add(Server::get(DistributedClear::class)); diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php new file mode 100644 index 00000000000..71ebdee0dbc --- /dev/null +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -0,0 +1,264 @@ +manager = $this->createMock(IManager::class); + $this->logger = $this->createMock(LoggerInterface::class); + $this->command = new WorkerCommand($this->manager, $this->logger); + } + + /** + * Helper to create a minimal ISynchronousProvider mock. + */ + private function createProvider(string $id, string $taskTypeId): ISynchronousProvider&MockObject { + $provider = $this->createMock(ISynchronousProvider::class); + $provider->method('getId')->willReturn($id); + $provider->method('getTaskTypeId')->willReturn($taskTypeId); + return $provider; + } + + /** + * Helper to create a Task mock with an id. + */ + private function createTask(int $id): Task&MockObject { + $task = $this->createMock(Task::class); + $task->method('getId')->willReturn($id); + return $task; + } + + public function testOnceExitsAfterNoTask(): void { + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([]); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testOnceProcessesOneTask(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('test_provider', $taskTypeId); + $task = $this->createTask(42); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willReturn($task); + + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider) + ->willReturn(true); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testSkipsNonSynchronousProviders(): void { + // A provider that is NOT an ISynchronousProvider + $nonSyncProvider = $this->createMock(\OCP\TaskProcessing\IProvider::class); + $nonSyncProvider->method('getId')->willReturn('non_sync_provider'); + $nonSyncProvider->method('getTaskTypeId')->willReturn('some_type'); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$nonSyncProvider]); + + $this->manager->expects($this->never()) + ->method('getPreferredProvider'); + + $this->manager->expects($this->never()) + ->method('getNextScheduledTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testSkipsNonPreferredProviders(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('provider_a', $taskTypeId); + $preferredProvider = $this->createProvider('provider_b', $taskTypeId); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($preferredProvider); + + // provider_a is not preferred (provider_b is), so getNextScheduledTask is never called + $this->manager->expects($this->never()) + ->method('getNextScheduledTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testContinuesWhenNoTaskFound(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('test_provider', $taskTypeId); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willThrowException(new NotFoundException()); + + $this->manager->expects($this->never()) + ->method('processTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testLogsErrorAndContinuesOnException(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('test_provider', $taskTypeId); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $exception = new Exception('DB error'); + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willThrowException($exception); + + $this->logger->expects($this->once()) + ->method('error') + ->with('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $exception]); + + $this->manager->expects($this->never()) + ->method('processTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testTimeoutExitsLoop(): void { + // Arrange: no providers so each iteration does nothing, but timeout=1 should exit quickly + $this->manager->method('getProviders')->willReturn([]); + + $input = new ArrayInput(['--timeout' => '1', '--interval' => '0'], $this->command->getDefinition()); + $output = new NullOutput(); + + $start = time(); + $result = $this->command->run($input, $output); + $elapsed = time() - $start; + + $this->assertSame(0, $result); + // Should have exited within a few seconds + $this->assertLessThanOrEqual(5, $elapsed); + } + + public function testProcessesFirstMatchingProvider(): void { + $taskTypeId1 = 'type_a'; + $taskTypeId2 = 'type_b'; + + $provider1 = $this->createProvider('provider_a', $taskTypeId1); + $provider2 = $this->createProvider('provider_b', $taskTypeId2); + $task = $this->createTask(7); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider1, $provider2]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId1) + ->willReturn($provider1); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId1]) + ->willReturn($task); + + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider1) + ->willReturn(true); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } +}