feat(task-streaming): allow the Php providers to set intermediate results with a callback in process

Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
This commit is contained in:
Julien Veyssier 2026-05-11 15:35:54 +02:00
parent a118773895
commit c7b866c7c3
No known key found for this signature in database
GPG key ID: 4141FEE162030638
2 changed files with 78 additions and 0 deletions

View file

@ -59,6 +59,7 @@ use OCP\TaskProcessing\Exception\ValidationException;
use OCP\TaskProcessing\IInternalTaskType;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\IProvider;
use OCP\TaskProcessing\ISynchronousProgressiveProvider;
use OCP\TaskProcessing\ISynchronousProvider;
use OCP\TaskProcessing\ISynchronousWatermarkingProvider;
use OCP\TaskProcessing\ITaskType;
@ -1135,6 +1136,13 @@ class Manager implements IManager {
$this->setTaskStatus($task, Task::STATUS_RUNNING);
if ($provider instanceof ISynchronousWatermarkingProvider) {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark());
} elseif ($provider instanceof ISynchronousProgressiveProvider) {
$output = $provider->process(
$task->getUserId(),
$input,
fn (float $progress) => $this->setTaskProgress($task->getId(), $progress),
fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output)
);
} else {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
}
@ -1216,6 +1224,40 @@ class Manager implements IManager {
return true;
}
public function setTaskIntermediateOutput(int $id, array $output): bool {
// TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently
$task = $this->getTask($id);
if ($task->getStatus() === Task::STATUS_CANCELLED) {
return false;
}
$userId = $task->getUserId();
if ($userId !== null && $userId !== '') {
try {
// TODO figure out how to get the queue with DI
// $queue = Server::get(\OCA\NotifyPush\IQueue::class);
$queue = $this->serverContainer->get(\OCA\NotifyPush\IQueue::class);
$queue->push('notify_custom', [
'user' => $userId,
'message' => 'taskprocessing_task_results',
'body' => $output,
]);
error_log('sending to queue!!!!!!');
} catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) {
$this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue');
error_log('NOT sending to queue!!!!!! ' . $e->getMessage());
}
}
// no output shape validation for now
$task->setOutput($output);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
}
return true;
}
#[\Override]
public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void {
// TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently

View file

@ -0,0 +1,36 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OCP\TaskProcessing;
use OCP\Files\File;
use OCP\TaskProcessing\Exception\ProcessingException;
/**
* This is the interface that is implemented by apps that
* implement a task processing provider that supports updating the output during processing
* @since 34.0.0
*/
interface ISynchronousProgressiveProvider extends ISynchronousProvider {
/**
* Returns the shape of optional output parameters
*
* @param null|string $userId The user that created the current task
* @param array<string, list<numeric|string|File>|numeric|string|File> $input The task input
* @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped.
* @param null|callable(array):bool $reportOutput Set the task intermediate output
* @psalm-return array<string, list<numeric|string>|numeric|string>
* @throws ProcessingException
* @since 33.0.0
*/
#[\Override]
public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array;
}