diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6821aee65a0..390b6b622ec 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -59,6 +59,7 @@ use OCP\TaskProcessing\Exception\ValidationException; use OCP\TaskProcessing\IInternalTaskType; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousProgressiveProvider; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\ISynchronousWatermarkingProvider; use OCP\TaskProcessing\ITaskType; @@ -1135,6 +1136,13 @@ class Manager implements IManager { $this->setTaskStatus($task, Task::STATUS_RUNNING); if ($provider instanceof ISynchronousWatermarkingProvider) { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); + } elseif ($provider instanceof ISynchronousProgressiveProvider) { + $output = $provider->process( + $task->getUserId(), + $input, + fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output) + ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); } @@ -1216,6 +1224,40 @@ class Manager implements IManager { return true; } + public function setTaskIntermediateOutput(int $id, array $output): bool { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() === Task::STATUS_CANCELLED) { + return false; + } + $userId = $task->getUserId(); + if ($userId !== null && $userId !== '') { + try { + // TODO figure out how to get the queue with DI + // $queue = Server::get(\OCA\NotifyPush\IQueue::class); + $queue = $this->serverContainer->get(\OCA\NotifyPush\IQueue::class); + $queue->push('notify_custom', [ + 'user' => $userId, + 'message' => 'taskprocessing_task_results', + 'body' => $output, + ]); + error_log('sending to queue!!!!!!'); + } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { + $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); + error_log('NOT sending to queue!!!!!! ' . $e->getMessage()); + } + } + // no output shape validation for now + $task->setOutput($output); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + return true; + } + #[\Override] public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently diff --git a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php new file mode 100644 index 00000000000..102062d8d1a --- /dev/null +++ b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php @@ -0,0 +1,36 @@ +|numeric|string|File> $input The task input + * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. + * @param null|callable(array):bool $reportOutput Set the task intermediate output + * @psalm-return array|numeric|string> + * @throws ProcessingException + * @since 33.0.0 + */ + #[\Override] + public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array; +}