mirror of
https://github.com/nextcloud/server.git
synced 2026-05-28 04:32:30 -04:00
fix: Fix task type starvation: collect all eligible task types then pick the oldest task
Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com>
This commit is contained in:
parent
2d24349ae5
commit
f212dc4664
1 changed files with 41 additions and 27 deletions
|
|
@ -105,16 +105,20 @@ class WorkerCommand extends Base {
|
|||
/**
|
||||
* Attempt to process one task across all preferred synchronous providers.
|
||||
*
|
||||
* To avoid starvation, all eligible task types are first collected and then
|
||||
* the oldest scheduled task across all of them is fetched in a single query.
|
||||
* This ensures that tasks are processed in the order they were scheduled,
|
||||
* regardless of which provider handles them.
|
||||
*
|
||||
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
|
||||
* @return bool True if a task was processed, false if no task was found
|
||||
*/
|
||||
private function processNextTask(OutputInterface $output, array $taskTypes = []): bool {
|
||||
$providers = $this->taskProcessingManager->getProviders();
|
||||
// Shuffle providers to avoid starvation: if providers are always iterated
|
||||
// in the same order, a provider with a constant stream of tasks would
|
||||
// prevent all subsequent providers from ever being processed.
|
||||
shuffle($providers);
|
||||
|
||||
// Build a map of eligible taskTypeId => provider for all preferred synchronous providers
|
||||
/** @var array<string, ISynchronousProvider> $eligibleProviders */
|
||||
$eligibleProviders = [];
|
||||
foreach ($providers as $provider) {
|
||||
if (!$provider instanceof ISynchronousProvider) {
|
||||
continue;
|
||||
|
|
@ -139,30 +143,40 @@ class WorkerCommand extends Base {
|
|||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$task = $this->taskProcessingManager->getNextScheduledTask([$taskTypeId]);
|
||||
} catch (NotFoundException) {
|
||||
continue;
|
||||
} catch (Exception $e) {
|
||||
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$output->writeln(
|
||||
'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(),
|
||||
OutputInterface::VERBOSITY_VERBOSE
|
||||
);
|
||||
|
||||
$this->taskProcessingManager->processTask($task, $provider);
|
||||
|
||||
$output->writeln(
|
||||
'Finished processing task ' . $task->getId(),
|
||||
OutputInterface::VERBOSITY_VERBOSE
|
||||
);
|
||||
|
||||
return true;
|
||||
$eligibleProviders[$taskTypeId] = $provider;
|
||||
}
|
||||
|
||||
return false;
|
||||
if (empty($eligibleProviders)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Fetch the oldest scheduled task across all eligible task types in one query.
|
||||
// This naturally prevents starvation: regardless of how many tasks one provider
|
||||
// has queued, another provider's older tasks will be picked up first.
|
||||
try {
|
||||
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
|
||||
} catch (NotFoundException) {
|
||||
return false;
|
||||
} catch (Exception $e) {
|
||||
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
|
||||
return false;
|
||||
}
|
||||
|
||||
$taskTypeId = $task->getTaskTypeId();
|
||||
$provider = $eligibleProviders[$taskTypeId];
|
||||
|
||||
$output->writeln(
|
||||
'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(),
|
||||
OutputInterface::VERBOSITY_VERBOSE
|
||||
);
|
||||
|
||||
$this->taskProcessingManager->processTask($task, $provider);
|
||||
|
||||
$output->writeln(
|
||||
'Finished processing task ' . $task->getId(),
|
||||
OutputInterface::VERBOSITY_VERBOSE
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue