mirror of
https://github.com/nextcloud/server.git
synced 2026-05-28 04:32:30 -04:00
feat(taskprocessing): add IManager::runTask method to run task synchronously
Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
This commit is contained in:
parent
829642c9e5
commit
c1ed256d50
3 changed files with 152 additions and 73 deletions
|
|
@ -716,55 +716,69 @@ class Manager implements IManager {
|
|||
if (!$this->canHandleTask($task)) {
|
||||
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
|
||||
}
|
||||
$taskTypes = $this->getAvailableTaskTypes();
|
||||
$inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape'];
|
||||
$inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults'];
|
||||
$inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues'];
|
||||
$optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape'];
|
||||
$optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues'];
|
||||
$optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults'];
|
||||
// validate input
|
||||
$this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
|
||||
$this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
|
||||
// authenticate access to mentioned files
|
||||
$ids = [];
|
||||
foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
|
||||
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
|
||||
/** @var list<int>|int $inputSlot */
|
||||
$inputSlot = $task->getInput()[$key];
|
||||
if (is_array($inputSlot)) {
|
||||
$ids += $inputSlot;
|
||||
} else {
|
||||
$ids[] = $inputSlot;
|
||||
}
|
||||
}
|
||||
}
|
||||
foreach ($ids as $fileId) {
|
||||
$this->validateFileId($fileId);
|
||||
$this->validateUserAccessToFile($fileId, $task->getUserId());
|
||||
}
|
||||
// remove superfluous keys and set input
|
||||
$input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
|
||||
$inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
|
||||
$task->setInput($inputWithDefaults);
|
||||
$this->prepareTask($task);
|
||||
$task->setStatus(Task::STATUS_SCHEDULED);
|
||||
$task->setScheduledAt(time());
|
||||
$provider = $this->getPreferredProvider($task->getTaskTypeId());
|
||||
// calculate expected completion time
|
||||
$completionExpectedAt = new \DateTime('now');
|
||||
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
|
||||
$task->setCompletionExpectedAt($completionExpectedAt);
|
||||
// create a db entity and insert into db table
|
||||
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
|
||||
$this->taskMapper->insert($taskEntity);
|
||||
// make sure the scheduler knows the id
|
||||
$task->setId($taskEntity->getId());
|
||||
$this->storeTask($task);
|
||||
// schedule synchronous job if the provider is synchronous
|
||||
$provider = $this->getPreferredProvider($task->getTaskTypeId());
|
||||
if ($provider instanceof ISynchronousProvider) {
|
||||
$this->jobList->add(SynchronousBackgroundJob::class, null);
|
||||
}
|
||||
}
|
||||
|
||||
public function runTask(Task $task): Task {
|
||||
if (!$this->canHandleTask($task)) {
|
||||
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
|
||||
}
|
||||
|
||||
$provider = $this->getPreferredProvider($task->getTaskTypeId());
|
||||
if ($provider instanceof ISynchronousProvider) {
|
||||
$this->prepareTask($task);
|
||||
$task->setStatus(Task::STATUS_SCHEDULED);
|
||||
$this->storeTask($task);
|
||||
$this->processTask($task, $provider);
|
||||
$task = $this->getTask($task->getId());
|
||||
} else {
|
||||
$this->scheduleTask($task);
|
||||
// poll task
|
||||
while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) {
|
||||
sleep(1);
|
||||
$task = $this->getTask($task->getId());
|
||||
}
|
||||
}
|
||||
return $task;
|
||||
}
|
||||
|
||||
public function processTask(Task $task, ISynchronousProvider $provider): bool {
|
||||
try {
|
||||
try {
|
||||
$input = $this->prepareInputData($task);
|
||||
} catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
|
||||
$this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
|
||||
$this->setTaskResult($task->getId(), $e->getMessage(), null);
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
$this->setTaskStatus($task, Task::STATUS_RUNNING);
|
||||
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
|
||||
} catch (ProcessingException $e) {
|
||||
$this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
|
||||
$this->setTaskResult($task->getId(), $e->getMessage(), null);
|
||||
return false;
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
|
||||
$this->setTaskResult($task->getId(), $e->getMessage(), null);
|
||||
return false;
|
||||
}
|
||||
$this->setTaskResult($task->getId(), null, $output);
|
||||
} catch (NotFoundException $e) {
|
||||
$this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
|
||||
} catch (Exception $e) {
|
||||
$this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public function deleteTask(Task $task): void {
|
||||
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
|
||||
$this->taskMapper->delete($taskEntity);
|
||||
|
|
@ -1095,6 +1109,72 @@ class Manager implements IManager {
|
|||
$this->taskMapper->update($taskEntity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate input, fill input default values, set completionExpectedAt, set scheduledAt
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
* @throws UnauthorizedException
|
||||
* @throws ValidationException
|
||||
* @throws \OCP\TaskProcessing\Exception\Exception
|
||||
*/
|
||||
private function prepareTask(Task $task): void {
|
||||
$taskTypes = $this->getAvailableTaskTypes();
|
||||
$taskType = $taskTypes[$task->getTaskTypeId()];
|
||||
$inputShape = $taskType['inputShape'];
|
||||
$inputShapeDefaults = $taskType['inputShapeDefaults'];
|
||||
$inputShapeEnumValues = $taskType['inputShapeEnumValues'];
|
||||
$optionalInputShape = $taskType['optionalInputShape'];
|
||||
$optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues'];
|
||||
$optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults'];
|
||||
// validate input
|
||||
$this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
|
||||
$this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
|
||||
// authenticate access to mentioned files
|
||||
$ids = [];
|
||||
foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
|
||||
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
|
||||
/** @var list<int>|int $inputSlot */
|
||||
$inputSlot = $task->getInput()[$key];
|
||||
if (is_array($inputSlot)) {
|
||||
$ids += $inputSlot;
|
||||
} else {
|
||||
$ids[] = $inputSlot;
|
||||
}
|
||||
}
|
||||
}
|
||||
foreach ($ids as $fileId) {
|
||||
$this->validateFileId($fileId);
|
||||
$this->validateUserAccessToFile($fileId, $task->getUserId());
|
||||
}
|
||||
// remove superfluous keys and set input
|
||||
$input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
|
||||
$inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
|
||||
$task->setInput($inputWithDefaults);
|
||||
$task->setScheduledAt(time());
|
||||
$provider = $this->getPreferredProvider($task->getTaskTypeId());
|
||||
// calculate expected completion time
|
||||
$completionExpectedAt = new \DateTime('now');
|
||||
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
|
||||
$task->setCompletionExpectedAt($completionExpectedAt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
* @throws Exception
|
||||
* @throws \JsonException
|
||||
*/
|
||||
private function storeTask(Task $task): void {
|
||||
// create a db entity and insert into db table
|
||||
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
|
||||
$this->taskMapper->insert($taskEntity);
|
||||
// make sure the scheduler knows the id
|
||||
$task->setId($taskEntity->getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $output
|
||||
* @param ShapeDescriptor[] ...$specs the specs that define which keys to keep
|
||||
|
|
|
|||
|
|
@ -57,37 +57,9 @@ class SynchronousBackgroundJob extends QueuedJob {
|
|||
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
try {
|
||||
$input = $this->taskProcessingManager->prepareInputData($task);
|
||||
} catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
|
||||
$this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
|
||||
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
|
||||
// Schedule again
|
||||
$this->jobList->add(self::class, $argument);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
$this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
|
||||
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
|
||||
} catch (ProcessingException $e) {
|
||||
$this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
|
||||
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
|
||||
// Schedule again
|
||||
$this->jobList->add(self::class, $argument);
|
||||
return;
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
|
||||
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
|
||||
// Schedule again
|
||||
$this->jobList->add(self::class, $argument);
|
||||
return;
|
||||
}
|
||||
$this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
|
||||
} catch (NotFoundException $e) {
|
||||
$this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
|
||||
} catch (Exception $e) {
|
||||
$this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
|
||||
if (!$this->taskProcessingManager->processTask($task, $provider)) {
|
||||
// Schedule again
|
||||
$this->jobList->add(self::class, $argument);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -61,6 +61,33 @@ interface IManager {
|
|||
*/
|
||||
public function scheduleTask(Task $task): void;
|
||||
|
||||
/**
|
||||
* Run the task and return the finished task
|
||||
*
|
||||
* @param Task $task The task to run
|
||||
* @return Task The result task
|
||||
* @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called
|
||||
* @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs
|
||||
* @throws Exception storing the task in the database failed
|
||||
* @throws UnauthorizedException the user scheduling the task does not have access to the files used in the input
|
||||
* @since 30.0.0
|
||||
*/
|
||||
public function runTask(Task $task): Task;
|
||||
|
||||
/**
|
||||
* Process task with a synchronous provider
|
||||
*
|
||||
* Prepare task input data and run the process method of the provider
|
||||
* This should only be used by OC\TaskProcessing\SynchronousBackgroundJob::run() and OCP\TaskProcessing\IManager::runTask()
|
||||
*
|
||||
* @param Task $task
|
||||
* @param ISynchronousProvider $provider
|
||||
* @return bool True if the task has run successfully
|
||||
* @throws Exception
|
||||
* @since 30.0.0
|
||||
*/
|
||||
public function processTask(Task $task, ISynchronousProvider $provider): bool;
|
||||
|
||||
/**
|
||||
* Delete a task that has been scheduled before
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in a new issue