feat(task-streaming): send a notify_push message whenever a task status changes

Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
This commit is contained in:
Julien Veyssier 2026-06-03 16:55:15 +02:00
parent 8ee02812ef
commit 4da89e92dd

View file

@ -1071,6 +1071,7 @@ class Manager implements IManager {
}
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$this->notifyTaskStatus($task, Task::STATUS_SCHEDULED);
$this->storeTask($task);
// schedule synchronous job if the provider is synchronous
$provider = $this->getPreferredProvider($task->getTaskTypeId());
@ -1114,6 +1115,7 @@ class Manager implements IManager {
if ($provider instanceof ISynchronousProvider) {
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$this->notifyTaskStatus($task, Task::STATUS_SCHEDULED);
$this->storeTask($task);
$this->processTask($task, $provider);
$task = $this->getTask($task->getId());
@ -1182,6 +1184,31 @@ class Manager implements IManager {
$this->taskMapper->delete($taskEntity);
}
private function notifyTaskStatus(Task $task, int $status): void {
$userId = $task->getUserId();
if ($userId !== null
&& $userId !== ''
&& $this->appManager->isEnabledForAnyone('notify_push')
&& interface_exists('\OCA\NotifyPush\Queue\IQueue')
) {
try {
/** @psalm-suppress UndefinedClass */
$queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class);
/** @psalm-suppress UndefinedClass */
$queue->push('notify_custom', [
'user' => $userId,
'message' => 'task_update',
'body' => [
'task_id' => $task->getId(),
'new_status' => $status,
],
]);
} catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) {
$this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue');
}
}
}
#[\Override]
public function getTask(int $id): Task {
try {
@ -1204,6 +1231,9 @@ class Manager implements IManager {
}
$task->setStatus(Task::STATUS_CANCELLED);
$task->setEndedAt(time());
$this->notifyTaskStatus($task, Task::STATUS_CANCELLED);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
@ -1225,6 +1255,7 @@ class Manager implements IManager {
$task->setStartedAt(time());
}
$task->setStatus(Task::STATUS_RUNNING);
$this->notifyTaskStatus($task, Task::STATUS_RUNNING);
$task->setProgress($progress);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
@ -1289,6 +1320,7 @@ class Manager implements IManager {
}
if ($error !== null) {
$task->setStatus(Task::STATUS_FAILED);
$this->notifyTaskStatus($task, Task::STATUS_FAILED);
$task->setEndedAt(time());
// truncate error message to 4000 characters
$task->setErrorMessage(substr($error, 0, 4000));
@ -1331,10 +1363,12 @@ class Manager implements IManager {
$task->setOutput($output);
$task->setProgress(1);
$task->setStatus(Task::STATUS_SUCCESSFUL);
$this->notifyTaskStatus($task, Task::STATUS_SUCCESSFUL);
$task->setEndedAt(time());
} catch (ValidationException $e) {
$task->setProgress(1);
$task->setStatus(Task::STATUS_FAILED);
$this->notifyTaskStatus($task, Task::STATUS_FAILED);
$task->setEndedAt(time());
$error = 'The task was processed successfully but the provider\'s output doesn\'t pass validation against the task type\'s outputShape spec and/or the provider\'s own optionalOutputShape spec';
$task->setErrorMessage($error);
@ -1342,6 +1376,7 @@ class Manager implements IManager {
} catch (NotPermittedException $e) {
$task->setProgress(1);
$task->setStatus(Task::STATUS_FAILED);
$this->notifyTaskStatus($task, Task::STATUS_FAILED);
$task->setEndedAt(time());
$error = 'The task was processed successfully but storing the output in a file failed';
$task->setErrorMessage($error);
@ -1349,6 +1384,7 @@ class Manager implements IManager {
} catch (InvalidPathException|\OCP\Files\NotFoundException $e) {
$task->setProgress(1);
$task->setStatus(Task::STATUS_FAILED);
$this->notifyTaskStatus($task, Task::STATUS_FAILED);
$task->setEndedAt(time());
$error = 'The task was processed successfully but the result file could not be found';
$task->setErrorMessage($error);
@ -1572,6 +1608,7 @@ class Manager implements IManager {
return false;
}
$task->setStatus(Task::STATUS_RUNNING);
$this->notifyTaskStatus($task, Task::STATUS_RUNNING);
return true;
}
@ -1590,6 +1627,7 @@ class Manager implements IManager {
$task->setScheduledAt(time());
}
$task->setStatus($status);
$this->notifyTaskStatus($task, $status);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->update($taskEntity);
}