feat(task-streaming): add preferStreaming boolean attribute to taskprocessing tasks. pass it to ISynchronousProgressiveProvider::process

Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
This commit is contained in:
Julien Veyssier 2026-05-27 16:48:45 +02:00
parent 2e81484ccd
commit 605d493a25
No known key found for this signature in database
GPG key ID: 4141FEE162030638
13 changed files with 802 additions and 10 deletions

View file

@ -0,0 +1,50 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Core\Migrations;
use Closure;
use OCP\DB\ISchemaWrapper;
use OCP\DB\Types;
use OCP\Migration\Attributes\AddColumn;
use OCP\Migration\Attributes\ColumnType;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;
/**
*
*/
#[AddColumn(table: 'taskprocessing_tasks', name: 'prefer_streaming', type: ColumnType::SMALLINT)]
class Version35000Date20260527162338 extends SimpleMigrationStep {
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
* @return null|ISchemaWrapper
*/
#[\Override]
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
if ($schema->hasTable('taskprocessing_tasks')) {
$table = $schema->getTable('taskprocessing_tasks');
if (!$table->hasColumn('prefer_streaming')) {
$table->addColumn('prefer_streaming', Types::SMALLINT, [
'notnull' => true,
'default' => 1,
'unsigned' => true,
]);
return $schema;
}
}
return null;
}
}

View file

@ -213,6 +213,7 @@ namespace OC\Core;
* allowCleanup: bool,
* includeWatermark: bool,
* userFacingErrorMessage: ?string,
* preferStreaming: bool,
* }
*
* @psalm-type CoreProfileAction = array{

View file

@ -204,7 +204,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -285,6 +286,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},
@ -2207,6 +2211,238 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": {
"post": {
"operationId": "task_processing_api-set-intermediate-result",
"summary": "Sets the task intermediate result while it is running",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"requestBody": {
"required": false,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"output": {
"type": "object",
"nullable": true,
"description": "The intermediate task output, files are represented by their IDs",
"additionalProperties": {
"type": "object"
}
}
}
}
}
}
},
"parameters": [
{
"name": "taskId",
"in": "path",
"description": "The id of the task",
"required": true,
"schema": {
"type": "integer",
"format": "int64"
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Result updated successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"task"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"404": {
"description": "Task not found",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": {
"post": {
"operationId": "task_processing_api-cancel-task-ex-app-endpoint",

View file

@ -664,7 +664,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -745,6 +746,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},
@ -12017,6 +12021,238 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": {
"post": {
"operationId": "task_processing_api-set-intermediate-result",
"summary": "Sets the task intermediate result while it is running",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"requestBody": {
"required": false,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"output": {
"type": "object",
"nullable": true,
"description": "The intermediate task output, files are represented by their IDs",
"additionalProperties": {
"type": "object"
}
}
}
}
}
}
},
"parameters": [
{
"name": "taskId",
"in": "path",
"description": "The id of the task",
"required": true,
"schema": {
"type": "integer",
"format": "int64"
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Result updated successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"task"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"404": {
"description": "Task not found",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": {
"post": {
"operationId": "task_processing_api-cancel-task-ex-app-endpoint",

View file

@ -664,7 +664,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -745,6 +746,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},

View file

@ -1610,6 +1610,7 @@ return array(
'OC\\Core\\Migrations\\Version34000Date20260415161745' => $baseDir . '/core/Migrations/Version34000Date20260415161745.php',
'OC\\Core\\Migrations\\Version34000Date20260518163022' => $baseDir . '/core/Migrations/Version34000Date20260518163022.php',
'OC\\Core\\Migrations\\Version34000Date20260521110333' => $baseDir . '/core/Migrations/Version34000Date20260521110333.php',
'OC\\Core\\Migrations\\Version35000Date20260527162338' => $baseDir . '/core/Migrations/Version35000Date20260527162338.php',
'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\CronService' => $baseDir . '/core/Service/CronService.php',

View file

@ -1651,6 +1651,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
'OC\\Core\\Migrations\\Version34000Date20260415161745' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260415161745.php',
'OC\\Core\\Migrations\\Version34000Date20260518163022' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260518163022.php',
'OC\\Core\\Migrations\\Version34000Date20260521110333' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260521110333.php',
'OC\\Core\\Migrations\\Version35000Date20260527162338' => __DIR__ . '/../../..' . '/core/Migrations/Version35000Date20260527162338.php',
'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\CronService' => __DIR__ . '/../../..' . '/core/Service/CronService.php',

View file

@ -51,6 +51,8 @@ use OCP\TaskProcessing\Task as OCPTask;
* @method null|string getUserFacingErrorMessage()
* @method setIncludeWatermark(int $includeWatermark)
* @method int getIncludeWatermark()
* @method setPreferStreaming(int $preferStreaming)
* @method int getPreferStreaming()
*/
class Task extends Entity {
protected $lastUpdated;
@ -72,16 +74,17 @@ class Task extends Entity {
protected $allowCleanup;
protected $userFacingErrorMessage;
protected $includeWatermark;
protected $preferStreaming;
/**
* @var string[]
*/
public const COLUMNS = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark'];
public const COLUMNS = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark', 'prefer_streaming'];
/**
* @var string[]
*/
public const FIELDS = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark'];
public const FIELDS = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark', 'preferStreaming'];
public function __construct() {
// add types in constructor
@ -105,6 +108,7 @@ class Task extends Entity {
$this->addType('allowCleanup', 'integer');
$this->addType('userFacingErrorMessage', 'string');
$this->addType('includeWatermark', 'integer');
$this->addType('preferStreaming', 'integer');
}
public function toRow(): array {
@ -136,6 +140,7 @@ class Task extends Entity {
'allowCleanup' => $task->getAllowCleanup() ? 1 : 0,
'userFacingErrorMessage' => $task->getUserFacingErrorMessage(),
'includeWatermark' => $task->getIncludeWatermark() ? 1 : 0,
'preferStreaming' => $task->getPreferStreaming() ? 1 : 0,
]);
return $taskEntity;
}
@ -161,6 +166,7 @@ class Task extends Entity {
$task->setAllowCleanup($this->getAllowCleanup() !== 0);
$task->setUserFacingErrorMessage($this->getUserFacingErrorMessage());
$task->setIncludeWatermark($this->getIncludeWatermark() !== 0);
$task->setPreferStreaming($this->getPreferStreaming() !== 0);
return $task;
}
}

View file

@ -1144,7 +1144,8 @@ class Manager implements IManager {
$task->getUserId(),
$input,
fn (float $progress) => $this->setTaskProgress($task->getId(), $progress),
fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output)
fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output),
$task->getPreferStreaming()
);
} else {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));

View file

@ -27,10 +27,11 @@ interface ISynchronousProgressiveProvider extends ISynchronousProvider {
* @param array<string, list<numeric|string|File>|numeric|string|File> $input The task input
* @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped.
* @param null|callable(array):bool $reportOutput Set the task intermediate output
* @param bool $preferStreaming Whether to prefer streaming output or not
* @psalm-return array<string, list<numeric|string>|numeric|string>
* @throws ProcessingException
* @since 33.0.0
* @since 35.0.0
*/
#[\Override]
public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array;
public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null, bool $preferStreaming = true): array;
}

View file

@ -51,6 +51,8 @@ final class Task implements \JsonSerializable {
protected bool $includeWatermark = true;
protected bool $preferStreaming = true;
/**
* @since 30.0.0
*/
@ -293,6 +295,22 @@ final class Task implements \JsonSerializable {
$this->includeWatermark = $includeWatermark;
}
/**
* @return bool
* @since 35.0.0
*/
final public function getPreferStreaming(): bool {
return $this->preferStreaming;
}
/**
* @param bool $preferStreaming
* @since 35.0.0
*/
final public function setPreferStreaming(bool $preferStreaming): void {
$this->preferStreaming = $preferStreaming;
}
/**
* @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array<string, list<numeric|string>|numeric|string>, output: ?array<string, list<numeric|string>|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int, allowCleanup: bool, includeWatermark: bool, userFacingErrorMessage: ?string}
* @since 30.0.0
@ -317,6 +335,7 @@ final class Task implements \JsonSerializable {
'allowCleanup' => $this->getAllowCleanup(),
'includeWatermark' => $this->getIncludeWatermark(),
'userFacingErrorMessage' => $this->getUserFacingErrorMessage(),
'preferStreaming' => $this->getPreferStreaming(),
];
}

View file

@ -710,7 +710,8 @@
"endedAt",
"allowCleanup",
"includeWatermark",
"userFacingErrorMessage"
"userFacingErrorMessage",
"preferStreaming"
],
"properties": {
"id": {
@ -791,6 +792,9 @@
"userFacingErrorMessage": {
"type": "string",
"nullable": true
},
"preferStreaming": {
"type": "boolean"
}
}
},
@ -15725,6 +15729,238 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": {
"post": {
"operationId": "core-task_processing_api-set-intermediate-result",
"summary": "Sets the task intermediate result while it is running",
"description": "This endpoint requires admin access",
"tags": [
"core/task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"requestBody": {
"required": false,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"output": {
"type": "object",
"nullable": true,
"description": "The intermediate task output, files are represented by their IDs",
"additionalProperties": {
"type": "object"
}
}
}
}
}
}
},
"parameters": [
{
"name": "taskId",
"in": "path",
"description": "The id of the task",
"required": true,
"schema": {
"type": "integer",
"format": "int64"
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Result updated successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"task"
],
"properties": {
"task": {
"$ref": "#/components/schemas/CoreTaskProcessingTask"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"404": {
"description": "Task not found",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": {
"post": {
"operationId": "core-task_processing_api-cancel-task-ex-app-endpoint",

View file

@ -11,7 +11,7 @@ declare(strict_types=1);
// between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level
// when updating major/minor version number.
$OC_Version = [35, 0, 0, 0];
$OC_Version = [35, 0, 0, 1];
// The human-readable string
$OC_VersionString = '35.0.0 dev';