From 4da89e92dd42a0f4134717d6dd66994a7880cfc1 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Wed, 3 Jun 2026 16:55:15 +0200 Subject: [PATCH] feat(task-streaming): send a notify_push message whenever a task status changes Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 38 ++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 59affee3cc2..1557026396f 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1071,6 +1071,7 @@ class Manager implements IManager { } $this->prepareTask($task); $task->setStatus(Task::STATUS_SCHEDULED); + $this->notifyTaskStatus($task, Task::STATUS_SCHEDULED); $this->storeTask($task); // schedule synchronous job if the provider is synchronous $provider = $this->getPreferredProvider($task->getTaskTypeId()); @@ -1114,6 +1115,7 @@ class Manager implements IManager { if ($provider instanceof ISynchronousProvider) { $this->prepareTask($task); $task->setStatus(Task::STATUS_SCHEDULED); + $this->notifyTaskStatus($task, Task::STATUS_SCHEDULED); $this->storeTask($task); $this->processTask($task, $provider); $task = $this->getTask($task->getId()); @@ -1182,6 +1184,31 @@ class Manager implements IManager { $this->taskMapper->delete($taskEntity); } + private function notifyTaskStatus(Task $task, int $status): void { + $userId = $task->getUserId(); + if ($userId !== null + && $userId !== '' + && $this->appManager->isEnabledForAnyone('notify_push') + && interface_exists('\OCA\NotifyPush\Queue\IQueue') + ) { + try { + /** @psalm-suppress UndefinedClass */ + $queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class); + /** @psalm-suppress UndefinedClass */ + $queue->push('notify_custom', [ + 'user' => $userId, + 'message' => 'task_update', + 'body' => [ + 'task_id' => $task->getId(), + 'new_status' => $status, + ], + ]); + } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { + $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); + } + } + } + #[\Override] public function getTask(int $id): Task { try { @@ -1204,6 +1231,9 @@ class Manager implements IManager { } $task->setStatus(Task::STATUS_CANCELLED); $task->setEndedAt(time()); + + $this->notifyTaskStatus($task, Task::STATUS_CANCELLED); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); try { $this->taskMapper->update($taskEntity); @@ -1225,6 +1255,7 @@ class Manager implements IManager { $task->setStartedAt(time()); } $task->setStatus(Task::STATUS_RUNNING); + $this->notifyTaskStatus($task, Task::STATUS_RUNNING); $task->setProgress($progress); $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); try { @@ -1289,6 +1320,7 @@ class Manager implements IManager { } if ($error !== null) { $task->setStatus(Task::STATUS_FAILED); + $this->notifyTaskStatus($task, Task::STATUS_FAILED); $task->setEndedAt(time()); // truncate error message to 4000 characters $task->setErrorMessage(substr($error, 0, 4000)); @@ -1331,10 +1363,12 @@ class Manager implements IManager { $task->setOutput($output); $task->setProgress(1); $task->setStatus(Task::STATUS_SUCCESSFUL); + $this->notifyTaskStatus($task, Task::STATUS_SUCCESSFUL); $task->setEndedAt(time()); } catch (ValidationException $e) { $task->setProgress(1); $task->setStatus(Task::STATUS_FAILED); + $this->notifyTaskStatus($task, Task::STATUS_FAILED); $task->setEndedAt(time()); $error = 'The task was processed successfully but the provider\'s output doesn\'t pass validation against the task type\'s outputShape spec and/or the provider\'s own optionalOutputShape spec'; $task->setErrorMessage($error); @@ -1342,6 +1376,7 @@ class Manager implements IManager { } catch (NotPermittedException $e) { $task->setProgress(1); $task->setStatus(Task::STATUS_FAILED); + $this->notifyTaskStatus($task, Task::STATUS_FAILED); $task->setEndedAt(time()); $error = 'The task was processed successfully but storing the output in a file failed'; $task->setErrorMessage($error); @@ -1349,6 +1384,7 @@ class Manager implements IManager { } catch (InvalidPathException|\OCP\Files\NotFoundException $e) { $task->setProgress(1); $task->setStatus(Task::STATUS_FAILED); + $this->notifyTaskStatus($task, Task::STATUS_FAILED); $task->setEndedAt(time()); $error = 'The task was processed successfully but the result file could not be found'; $task->setErrorMessage($error); @@ -1572,6 +1608,7 @@ class Manager implements IManager { return false; } $task->setStatus(Task::STATUS_RUNNING); + $this->notifyTaskStatus($task, Task::STATUS_RUNNING); return true; } @@ -1590,6 +1627,7 @@ class Manager implements IManager { $task->setScheduledAt(time()); } $task->setStatus($status); + $this->notifyTaskStatus($task, $status); $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); $this->taskMapper->update($taskEntity); }