Merge pull request #60500 from nextcloud/enh/noid/taskprocessing-streaming

On behalf of Julien
This commit is contained in:
Daphne Muller 2026-06-11 10:22:33 +02:00 committed by GitHub
commit 69af641e34
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 1055 additions and 8 deletions

View file

@ -643,6 +643,37 @@ class TaskProcessingApiController extends OCSController {
}
}
/**
* Sets the task intermediate result while it is running
*
* @param int $taskId The id of the task
* @param array<string,mixed> $output The intermediate task output, files are represented by their IDs
* @return DataResponse<Http::STATUS_OK, array{task: CoreTaskProcessingTask}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_NOT_FOUND, array{message: string}, array{}>
*
* 200: Result updated successfully
* 404: Task not found
*/
#[ExAppRequired]
#[ApiRoute(verb: 'POST', url: '/tasks_provider/{taskId}/stream-result', root: '/taskprocessing')]
public function setIntermediateResult(int $taskId, array $output): DataResponse {
try {
// set result
$this->taskProcessingManager->setTaskIntermediateOutput($taskId, $output);
$task = $this->taskProcessingManager->getTask($taskId);
/** @var CoreTaskProcessingTask $json */
$json = $task->jsonSerialize();
return new DataResponse([
'task' => $json,
]);
} catch (NotFoundException) {
return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND);
} catch (Exception) {
return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR);
}
}
/**
* @return DataResponse<Http::STATUS_OK, array{task: CoreTaskProcessingTask}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_NOT_FOUND, array{message: string}, array{}>
*/

View file

@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Core\Migrations;
use Closure;
use OCP\DB\ISchemaWrapper;
use OCP\DB\Types;
use OCP\Migration\Attributes\AddColumn;
use OCP\Migration\Attributes\ColumnType;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;
/**
*
*/
#[AddColumn(table: 'taskprocessing_tasks', name: 'prefer_streaming', type: ColumnType::SMALLINT)]
class Version35000Date20260527162338 extends SimpleMigrationStep {
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
* @return null|ISchemaWrapper
*/
#[\Override]
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
if ($schema->hasTable('taskprocessing_tasks')) {
$table = $schema->getTable('taskprocessing_tasks');
if (!$table->hasColumn('prefer_streaming')) {
$table->addColumn('prefer_streaming', Types::SMALLINT, [
'notnull' => true,
'default' => 1,
'unsigned' => true,
]);
return $schema;
}
}
return null;
}
}

View file

@ -213,6 +213,7 @@ namespace OC\Core;
* allowCleanup: bool,
* includeWatermark: bool,
* userFacingErrorMessage: ?string,
* preferStreaming: bool,
* }
*
* @psalm-type CoreProfileAction = array{

View file

@ -204,7 +204,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -285,6 +286,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},
@ -2207,6 +2211,240 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": {
"post": {
"operationId": "task_processing_api-set-intermediate-result",
"summary": "Sets the task intermediate result while it is running",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"output"
],
"properties": {
"output": {
"type": "object",
"description": "The intermediate task output, files are represented by their IDs",
"additionalProperties": {
"type": "object"
}
}
}
}
}
}
},
"parameters": [
{
"name": "taskId",
"in": "path",
"description": "The id of the task",
"required": true,
"schema": {
"type": "integer",
"format": "int64"
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Result updated successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"task"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"404": {
"description": "Task not found",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": {
"post": {
"operationId": "task_processing_api-cancel-task-ex-app-endpoint",

View file

@ -664,7 +664,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -745,6 +746,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},
@ -12017,6 +12021,240 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": {
"post": {
"operationId": "task_processing_api-set-intermediate-result",
"summary": "Sets the task intermediate result while it is running",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"output"
],
"properties": {
"output": {
"type": "object",
"description": "The intermediate task output, files are represented by their IDs",
"additionalProperties": {
"type": "object"
}
}
}
}
}
}
},
"parameters": [
{
"name": "taskId",
"in": "path",
"description": "The id of the task",
"required": true,
"schema": {
"type": "integer",
"format": "int64"
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Result updated successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"task"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"404": {
"description": "Task not found",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": {
"post": {
"operationId": "task_processing_api-cancel-task-ex-app-endpoint",

View file

@ -664,7 +664,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -745,6 +746,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},

View file

@ -923,12 +923,14 @@ return array(
'OCP\\TaskProcessing\\IInternalTaskType' => $baseDir . '/lib/public/TaskProcessing/IInternalTaskType.php',
'OCP\\TaskProcessing\\IManager' => $baseDir . '/lib/public/TaskProcessing/IManager.php',
'OCP\\TaskProcessing\\IProvider' => $baseDir . '/lib/public/TaskProcessing/IProvider.php',
'OCP\\TaskProcessing\\ISynchronousOptionsAwareProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousOptionsAwareProvider.php',
'OCP\\TaskProcessing\\ISynchronousProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousProvider.php',
'OCP\\TaskProcessing\\ISynchronousWatermarkingProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php',
'OCP\\TaskProcessing\\ITaskType' => $baseDir . '/lib/public/TaskProcessing/ITaskType.php',
'OCP\\TaskProcessing\\ITriggerableProvider' => $baseDir . '/lib/public/TaskProcessing/ITriggerableProvider.php',
'OCP\\TaskProcessing\\ShapeDescriptor' => $baseDir . '/lib/public/TaskProcessing/ShapeDescriptor.php',
'OCP\\TaskProcessing\\ShapeEnumValue' => $baseDir . '/lib/public/TaskProcessing/ShapeEnumValue.php',
'OCP\\TaskProcessing\\SynchronousProviderOptions' => $baseDir . '/lib/public/TaskProcessing/SynchronousProviderOptions.php',
'OCP\\TaskProcessing\\Task' => $baseDir . '/lib/public/TaskProcessing/Task.php',
'OCP\\TaskProcessing\\TaskTypes\\AnalyzeImages' => $baseDir . '/lib/public/TaskProcessing/TaskTypes/AnalyzeImages.php',
'OCP\\TaskProcessing\\TaskTypes\\AudioToAudioChat' => $baseDir . '/lib/public/TaskProcessing/TaskTypes/AudioToAudioChat.php',
@ -1608,6 +1610,7 @@ return array(
'OC\\Core\\Migrations\\Version34000Date20260415161745' => $baseDir . '/core/Migrations/Version34000Date20260415161745.php',
'OC\\Core\\Migrations\\Version34000Date20260518163022' => $baseDir . '/core/Migrations/Version34000Date20260518163022.php',
'OC\\Core\\Migrations\\Version34000Date20260521110333' => $baseDir . '/core/Migrations/Version34000Date20260521110333.php',
'OC\\Core\\Migrations\\Version35000Date20260527162338' => $baseDir . '/core/Migrations/Version35000Date20260527162338.php',
'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\CronService' => $baseDir . '/core/Service/CronService.php',

View file

@ -964,12 +964,14 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
'OCP\\TaskProcessing\\IInternalTaskType' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IInternalTaskType.php',
'OCP\\TaskProcessing\\IManager' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IManager.php',
'OCP\\TaskProcessing\\IProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IProvider.php',
'OCP\\TaskProcessing\\ISynchronousOptionsAwareProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousOptionsAwareProvider.php',
'OCP\\TaskProcessing\\ISynchronousProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousProvider.php',
'OCP\\TaskProcessing\\ISynchronousWatermarkingProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php',
'OCP\\TaskProcessing\\ITaskType' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ITaskType.php',
'OCP\\TaskProcessing\\ITriggerableProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ITriggerableProvider.php',
'OCP\\TaskProcessing\\ShapeDescriptor' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ShapeDescriptor.php',
'OCP\\TaskProcessing\\ShapeEnumValue' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ShapeEnumValue.php',
'OCP\\TaskProcessing\\SynchronousProviderOptions' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/SynchronousProviderOptions.php',
'OCP\\TaskProcessing\\Task' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/Task.php',
'OCP\\TaskProcessing\\TaskTypes\\AnalyzeImages' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/TaskTypes/AnalyzeImages.php',
'OCP\\TaskProcessing\\TaskTypes\\AudioToAudioChat' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/TaskTypes/AudioToAudioChat.php',
@ -1649,6 +1651,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
'OC\\Core\\Migrations\\Version34000Date20260415161745' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260415161745.php',
'OC\\Core\\Migrations\\Version34000Date20260518163022' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260518163022.php',
'OC\\Core\\Migrations\\Version34000Date20260521110333' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260521110333.php',
'OC\\Core\\Migrations\\Version35000Date20260527162338' => __DIR__ . '/../../..' . '/core/Migrations/Version35000Date20260527162338.php',
'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\CronService' => __DIR__ . '/../../..' . '/core/Service/CronService.php',

View file

@ -51,6 +51,8 @@ use OCP\TaskProcessing\Task as OCPTask;
* @method null|string getUserFacingErrorMessage()
* @method setIncludeWatermark(int $includeWatermark)
* @method int getIncludeWatermark()
* @method setPreferStreaming(int $preferStreaming)
* @method int getPreferStreaming()
*/
class Task extends Entity {
protected $lastUpdated;
@ -72,16 +74,17 @@ class Task extends Entity {
protected $allowCleanup;
protected $userFacingErrorMessage;
protected $includeWatermark;
protected $preferStreaming;
/**
* @var string[]
*/
public const COLUMNS = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark'];
public const COLUMNS = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark', 'prefer_streaming'];
/**
* @var string[]
*/
public const FIELDS = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark'];
public const FIELDS = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark', 'preferStreaming'];
public function __construct() {
// add types in constructor
@ -105,6 +108,7 @@ class Task extends Entity {
$this->addType('allowCleanup', 'integer');
$this->addType('userFacingErrorMessage', 'string');
$this->addType('includeWatermark', 'integer');
$this->addType('preferStreaming', 'integer');
}
public function toRow(): array {
@ -136,6 +140,7 @@ class Task extends Entity {
'allowCleanup' => $task->getAllowCleanup() ? 1 : 0,
'userFacingErrorMessage' => $task->getUserFacingErrorMessage(),
'includeWatermark' => $task->getIncludeWatermark() ? 1 : 0,
'preferStreaming' => $task->getPreferStreaming() ? 1 : 0,
]);
return $taskEntity;
}
@ -161,6 +166,7 @@ class Task extends Entity {
$task->setAllowCleanup($this->getAllowCleanup() !== 0);
$task->setUserFacingErrorMessage($this->getUserFacingErrorMessage());
$task->setIncludeWatermark($this->getIncludeWatermark() !== 0);
$task->setPreferStreaming($this->getPreferStreaming() !== 0);
return $task;
}
}

View file

@ -19,6 +19,7 @@ use OCA\Guests\UserBackend;
use OCP\App\IAppManager;
use OCP\AppFramework\Db\DoesNotExistException;
use OCP\AppFramework\Db\MultipleObjectsReturnedException;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\DB\Exception;
use OCP\EventDispatcher\IEventDispatcher;
@ -59,12 +60,14 @@ use OCP\TaskProcessing\Exception\ValidationException;
use OCP\TaskProcessing\IInternalTaskType;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\IProvider;
use OCP\TaskProcessing\ISynchronousOptionsAwareProvider;
use OCP\TaskProcessing\ISynchronousProvider;
use OCP\TaskProcessing\ISynchronousWatermarkingProvider;
use OCP\TaskProcessing\ITaskType;
use OCP\TaskProcessing\ITriggerableProvider;
use OCP\TaskProcessing\ShapeDescriptor;
use OCP\TaskProcessing\ShapeEnumValue;
use OCP\TaskProcessing\SynchronousProviderOptions;
use OCP\TaskProcessing\Task;
use OCP\TaskProcessing\TaskTypes\AnalyzeImages;
use OCP\TaskProcessing\TaskTypes\AudioToAudioChat;
@ -156,6 +159,7 @@ class Manager implements IManager {
private IUserSession $userSession,
ICacheFactory $cacheFactory,
private IFactory $l10nFactory,
private ITimeFactory $timeFactory,
) {
$this->appData = $appDataFactory->get('core');
$this->distributedCache = $cacheFactory->createDistributed('task_processing::');
@ -1068,6 +1072,7 @@ class Manager implements IManager {
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$this->storeTask($task);
$this->notifyTaskStatus($task, Task::STATUS_SCHEDULED);
// schedule synchronous job if the provider is synchronous
$provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
@ -1111,6 +1116,7 @@ class Manager implements IManager {
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$this->storeTask($task);
$this->notifyTaskStatus($task, Task::STATUS_SCHEDULED);
$this->processTask($task, $provider);
$task = $this->getTask($task->getId());
} else {
@ -1138,6 +1144,18 @@ class Manager implements IManager {
$this->setTaskStatus($task, Task::STATUS_RUNNING);
if ($provider instanceof ISynchronousWatermarkingProvider) {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark());
} elseif ($provider instanceof ISynchronousOptionsAwareProvider) {
$options = new SynchronousProviderOptions(
$task->getIncludeWatermark(),
$task->getPreferStreaming(),
fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output),
);
$output = $provider->process(
$task->getUserId(),
$input,
fn (float $progress) => $this->setTaskProgress($task->getId(), $progress),
$options,
);
} else {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
}
@ -1166,6 +1184,31 @@ class Manager implements IManager {
$this->taskMapper->delete($taskEntity);
}
private function notifyTaskStatus(Task $task, int $status): void {
$userId = $task->getUserId();
if ($userId !== null
&& $userId !== ''
&& $this->appManager->isEnabledForAnyone('notify_push')
&& interface_exists('\OCA\NotifyPush\Queue\IQueue')
) {
try {
/** @psalm-suppress UndefinedClass */
$queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class);
/** @psalm-suppress UndefinedClass */
$queue->push('notify_custom', [
'user' => $userId,
'message' => 'taskprocessing:task_update',
'body' => [
'task_id' => $task->getId(),
'new_status' => $status,
],
]);
} catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) {
$this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue');
}
}
}
#[\Override]
public function getTask(int $id): Task {
try {
@ -1188,9 +1231,11 @@ class Manager implements IManager {
}
$task->setStatus(Task::STATUS_CANCELLED);
$task->setEndedAt(time());
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
$this->notifyTaskStatus($task, Task::STATUS_CANCELLED);
$this->runWebhook($task);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
@ -1211,6 +1256,56 @@ class Manager implements IManager {
$task->setStatus(Task::STATUS_RUNNING);
$task->setProgress($progress);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
$this->notifyTaskStatus($task, Task::STATUS_RUNNING);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
}
return true;
}
#[\Override]
public function setTaskIntermediateOutput(int $id, array $output): bool {
// TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently
try {
$task = $this->getTask($id);
} catch (NotFoundException|\OCP\TaskProcessing\Exception\Exception $e) {
$this->logger->debug('Couldn\'t find task, not sending intermediate output', ['exception' => $e, 'task_id' => $id]);
return false;
}
if ($task->getStatus() !== Task::STATUS_RUNNING) {
return false;
}
$userId = $task->getUserId();
if ($userId !== null
&& $userId !== ''
&& $this->appManager->isEnabledForAnyone('notify_push')
&& interface_exists('\OCA\NotifyPush\Queue\IQueue')
) {
try {
/** @psalm-suppress UndefinedClass */
$queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class);
/** @psalm-suppress UndefinedClass */
$queue->push('notify_custom', [
'user' => $userId,
'message' => 'taskprocessing:task_id_' . $task->getId(),
'body' => $output,
]);
} catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) {
$this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue');
}
}
// throttle DB update
$now = $this->timeFactory->now()->getTimestamp();
if ($now - $task->getLastUpdated() < 2) {
return true;
}
// no output shape validation for now
$task->setOutput($output);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
} catch (\OCP\DB\Exception $e) {
@ -1302,6 +1397,7 @@ class Manager implements IManager {
}
try {
$this->taskMapper->update($taskEntity);
$this->notifyTaskStatus($task, $task->getStatus());
$this->runWebhook($task);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception($e->getMessage());
@ -1512,6 +1608,7 @@ class Manager implements IManager {
return false;
}
$task->setStatus(Task::STATUS_RUNNING);
$this->notifyTaskStatus($task, Task::STATUS_RUNNING);
return true;
}
@ -1532,6 +1629,7 @@ class Manager implements IManager {
$task->setStatus($status);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->update($taskEntity);
$this->notifyTaskStatus($task, $status);
}
/**

View file

@ -142,6 +142,19 @@ interface IManager {
*/
public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void;
/**
* Set the task intermediate output.
* If notify_push is available, the output will be pushed to the user and the task will be updated in the DB every 2 seconds at most.
*
* @param int $id The id of the task
* @param array $output The intermediate output
* @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime
* @throws Exception If the query failed
* @throws NotFoundException If the task could not be found
* @since 35.0.0
*/
public function setTaskIntermediateOutput(int $id, array $output): bool;
/**
* @param int $id
* @param float $progress

View file

@ -0,0 +1,40 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OCP\TaskProcessing;
use OCP\Files\File;
use OCP\TaskProcessing\Exception\ProcessingException;
/**
* This is the interface that is implemented by apps that
* implement a task processing provider
* @since 35.0.0
*/
interface ISynchronousOptionsAwareProvider extends ISynchronousProvider {
/**
* Returns the shape of optional output parameters
*
* @param null|string $userId The user that created the current task
* @param array<string, list<numeric|string|File>|numeric|string|File> $input The task input
* @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped.
* @param SynchronousProviderOptions $options The task options
* @psalm-return array<string, list<numeric|string>|numeric|string>
* @throws ProcessingException
* @since 35.0.0
*/
#[\Override]
public function process(
?string $userId,
array $input,
callable $reportProgress,
SynchronousProviderOptions $options = new SynchronousProviderOptions(),
): array;
}

View file

@ -16,6 +16,7 @@ use OCP\TaskProcessing\Exception\ProcessingException;
* This is the interface that is implemented by apps that
* implement a task processing provider that supports watermarking
* @since 33.0.0
* @deprecated 35.0.0 Use ISynchronousOptionsProvider instead
*/
interface ISynchronousWatermarkingProvider extends ISynchronousProvider {

View file

@ -0,0 +1,61 @@
<?php
/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OCP\TaskProcessing;
/**
* @since 35.0.0
*/
class SynchronousProviderOptions {
private \Closure $reportIntermediateOutput;
/**
* @param bool $includeWatermarks Whether to include the watermark in the media output files or not
* @param bool $preferStreaming Whether to prefer streaming the output or not
* @param null|callable $reportIntermediateOutput Callback for the provider to report the intermediate output (streaming)
* @return void
* @since 35.0.0
*/
public function __construct(
private readonly bool $includeWatermarks = false,
private readonly bool $preferStreaming = true,
?callable $reportIntermediateOutput = null,
) {
$this->reportIntermediateOutput = $reportIntermediateOutput !== null
? \Closure::fromCallable($reportIntermediateOutput)
: static function (array $output): bool {
return true;
};
}
/**
* Get the includeWatermarks option value
* @return bool Whether to include the watermark in the media output files or not
* @since 35.0.0
*/
public function getIncludeWatermarks(): bool {
return $this->includeWatermarks;
}
/**
* Get the preferStreaming option value
* @return bool Whether to prefer streaming the output or not
* @since 35.0.0
*/
public function getPreferStreaming(): bool {
return $this->preferStreaming;
}
/**
* Get the reportOutput option value
* @return callable Callback for the provider to report the intermediate output (streaming)
* @since 35.0.0
*/
public function getReportIntermediateOutput(): callable {
return $this->reportIntermediateOutput;
}
}

View file

@ -51,6 +51,8 @@ final class Task implements \JsonSerializable {
protected bool $includeWatermark = true;
protected bool $preferStreaming = true;
/**
* @since 30.0.0
*/
@ -294,7 +296,23 @@ final class Task implements \JsonSerializable {
}
/**
* @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array<string, list<numeric|string>|numeric|string>, output: ?array<string, list<numeric|string>|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int, allowCleanup: bool, includeWatermark: bool, userFacingErrorMessage: ?string}
* @return bool
* @since 35.0.0
*/
final public function getPreferStreaming(): bool {
return $this->preferStreaming;
}
/**
* @param bool $preferStreaming
* @since 35.0.0
*/
final public function setPreferStreaming(bool $preferStreaming): void {
$this->preferStreaming = $preferStreaming;
}
/**
* @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array<string, list<numeric|string>|numeric|string>, output: ?array<string, list<numeric|string>|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int, allowCleanup: bool, includeWatermark: bool, userFacingErrorMessage: ?string, preferStreaming: bool}
* @since 30.0.0
*/
#[\Override]
@ -317,6 +335,7 @@ final class Task implements \JsonSerializable {
'allowCleanup' => $this->getAllowCleanup(),
'includeWatermark' => $this->getIncludeWatermark(),
'userFacingErrorMessage' => $this->getUserFacingErrorMessage(),
'preferStreaming' => $this->getPreferStreaming(),
];
}

View file

@ -710,7 +710,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -791,6 +792,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},
@ -15725,6 +15729,240 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": {
"post": {
"operationId": "core-task_processing_api-set-intermediate-result",
"summary": "Sets the task intermediate result while it is running",
"description": "This endpoint requires admin access",
"tags": [
"core/task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"output"
],
"properties": {
"output": {
"type": "object",
"description": "The intermediate task output, files are represented by their IDs",
"additionalProperties": {
"type": "object"
}
}
}
}
}
}
},
"parameters": [
{
"name": "taskId",
"in": "path",
"description": "The id of the task",
"required": true,
"schema": {
"type": "integer",
"format": "int64"
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Result updated successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"task"
],
"properties": {
"task": {
"$ref": "#/components/schemas/CoreTaskProcessingTask"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"404": {
"description": "Task not found",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": {
"post": {
"operationId": "core-task_processing_api-cancel-task-ex-app-endpoint",

View file

@ -855,6 +855,7 @@ class TaskProcessingTest extends \Test\TestCase {
Server::get(IUserSession::class),
Server::get(ICacheFactory::class),
Server::get(IFactory::class),
Server::get(ITimeFactory::class),
);
}
@ -1595,6 +1596,7 @@ class TaskProcessingTest extends \Test\TestCase {
Server::get(IUserSession::class),
Server::get(ICacheFactory::class),
Server::get(IFactory::class),
Server::get(ITimeFactory::class),
);
}

View file

@ -11,7 +11,7 @@ declare(strict_types=1);
// between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level
// when updating major/minor version number.
$OC_Version = [35, 0, 0, 0];
$OC_Version = [35, 0, 0, 1];
// The human-readable string
$OC_VersionString = '35.0.0 dev';