mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-02-19 02:57:59 -05:00
[v14.0/forgejo] fix: empty dynamic matrix can leave action run hanging incomplete (#11072)
Some checks are pending
/ release (push) Waiting to run
testing-integration / test-unit (push) Waiting to run
testing-integration / test-sqlite (push) Waiting to run
testing-integration / test-mariadb (v10.6) (push) Waiting to run
testing-integration / test-mariadb (v11.8) (push) Waiting to run
testing / backend-checks (push) Waiting to run
testing / frontend-checks (push) Waiting to run
testing / test-unit (push) Blocked by required conditions
testing / test-e2e (push) Blocked by required conditions
testing / test-remote-cacher (redis) (push) Blocked by required conditions
testing / test-remote-cacher (valkey) (push) Blocked by required conditions
testing / test-remote-cacher (garnet) (push) Blocked by required conditions
testing / test-remote-cacher (redict) (push) Blocked by required conditions
testing / test-mysql (push) Blocked by required conditions
testing / test-pgsql (push) Blocked by required conditions
testing / test-sqlite (push) Blocked by required conditions
testing / security-check (push) Blocked by required conditions
Some checks are pending
/ release (push) Waiting to run
testing-integration / test-unit (push) Waiting to run
testing-integration / test-sqlite (push) Waiting to run
testing-integration / test-mariadb (v10.6) (push) Waiting to run
testing-integration / test-mariadb (v11.8) (push) Waiting to run
testing / backend-checks (push) Waiting to run
testing / frontend-checks (push) Waiting to run
testing / test-unit (push) Blocked by required conditions
testing / test-e2e (push) Blocked by required conditions
testing / test-remote-cacher (redis) (push) Blocked by required conditions
testing / test-remote-cacher (valkey) (push) Blocked by required conditions
testing / test-remote-cacher (garnet) (push) Blocked by required conditions
testing / test-remote-cacher (redict) (push) Blocked by required conditions
testing / test-mysql (push) Blocked by required conditions
testing / test-pgsql (push) Blocked by required conditions
testing / test-sqlite (push) Blocked by required conditions
testing / security-check (push) Blocked by required conditions
**Backport:** #11063
Fixes #11030.
When a `strategy.matrix` needs to be evaluated on the output of another job, it can become evaluated into an empty set of jobs. In this case, and assuming no other jobs in the run are active, the run should reach a settled state. The logic to check the other jobs in the run and determine if this state has been hit needs to be explicitly added to the job emitter.
To accomplish this change, this action run state logic was extracted out of `UpdateRunJobWithoutNotification` where it could be reused.
(cherry picked from commit c198082975)
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11072
Reviewed-by: Gusted <gusted@noreply.codeberg.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
parent
9347d06de5
commit
032b0bbeda
8 changed files with 274 additions and 32 deletions
|
|
@ -500,4 +500,35 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s
|
|||
return nil
|
||||
}
|
||||
|
||||
// Compute the Status, Started, and Stopped fields of an ActionRun based upon the current job state within the run.
|
||||
// Returned is the [ActionRun] with modifications if necessary, a slice of column names that have been updated, or an
|
||||
// error if the calculation failed. The caller is responsible for then invoking [actions_service.UpdateRun] for an
|
||||
// update with notifications, or [actions_model.UpdateRunWithoutNotification] if notifications are already handled.
|
||||
func ComputeRunStatus(ctx context.Context, runID int64) (run *ActionRun, columns []string, err error) {
|
||||
run, err = GetRunByID(ctx, runID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
jobs, err := GetRunJobsByRunID(ctx, runID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
newStatus := AggregateJobStatus(jobs)
|
||||
if run.Status != newStatus {
|
||||
run.Status = newStatus
|
||||
columns = append(columns, "status")
|
||||
}
|
||||
if run.Started.IsZero() && run.Status.IsRunning() {
|
||||
run.Started = timeutil.TimeStampNow()
|
||||
columns = append(columns, "started")
|
||||
}
|
||||
if run.Stopped.IsZero() && run.Status.IsDone() {
|
||||
run.Stopped = timeutil.TimeStampNow()
|
||||
columns = append(columns, "stopped")
|
||||
}
|
||||
|
||||
return run, columns, nil
|
||||
}
|
||||
|
||||
type ActionRunIndex db.ResourceIndex
|
||||
|
|
|
|||
|
|
@ -174,38 +174,12 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Other goroutines may aggregate the status of the run and update it too.
|
||||
// So we need load the run and its jobs before updating the run.
|
||||
run, err := GetRunByID(ctx, job.RunID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
updateRequired := false
|
||||
newStatus := AggregateJobStatus(jobs)
|
||||
if run.Status != newStatus {
|
||||
run.Status = newStatus
|
||||
updateRequired = true
|
||||
}
|
||||
if run.Started.IsZero() && run.Status.IsRunning() {
|
||||
run.Started = timeutil.TimeStampNow()
|
||||
updateRequired = true
|
||||
}
|
||||
if run.Stopped.IsZero() && run.Status.IsDone() {
|
||||
run.Stopped = timeutil.TimeStampNow()
|
||||
updateRequired = true
|
||||
}
|
||||
if updateRequired {
|
||||
// As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here.
|
||||
if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil {
|
||||
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
|
||||
}
|
||||
}
|
||||
run, columns, err := ComputeRunStatus(ctx, job.RunID)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("compute run status: %w", err)
|
||||
}
|
||||
if err := UpdateRunWithoutNotification(ctx, run, columns...); err != nil {
|
||||
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
|
||||
}
|
||||
|
||||
return affected, nil
|
||||
|
|
|
|||
|
|
@ -272,3 +272,122 @@ jobs:
|
|||
// Expect job with an incomplete runs-on to be StatusBlocked:
|
||||
assert.Equal(t, StatusBlocked, job.Status)
|
||||
}
|
||||
|
||||
func TestComputeRunStatus(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
t.Run("no changes", func(t *testing.T) {
|
||||
run, columns, err := ComputeRunStatus(t.Context(), 791)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, StatusSuccess, run.Status)
|
||||
assert.NotContains(t, columns, "status")
|
||||
assert.EqualValues(t, 1683636528, run.Started)
|
||||
assert.NotContains(t, columns, "started")
|
||||
assert.EqualValues(t, 1683636626, run.Stopped)
|
||||
assert.NotContains(t, columns, "stopped")
|
||||
})
|
||||
|
||||
t.Run("change status", func(t *testing.T) {
|
||||
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
|
||||
job.Status = StatusFailure
|
||||
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
run, columns, err := ComputeRunStatus(t.Context(), 791)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, StatusFailure, run.Status)
|
||||
assert.Contains(t, columns, "status")
|
||||
assert.NotContains(t, columns, "started")
|
||||
assert.NotContains(t, columns, "stopped")
|
||||
})
|
||||
|
||||
t.Run("won't change started if not running", func(t *testing.T) {
|
||||
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
|
||||
job.Status = StatusBlocked
|
||||
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
|
||||
preRun.Started = 0
|
||||
affected, err = db.GetEngine(t.Context()).Cols("started").ID(preRun.ID).Update(preRun)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
run, columns, err := ComputeRunStatus(t.Context(), 791)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, StatusBlocked, run.Status)
|
||||
assert.EqualValues(t, 0, run.Started)
|
||||
assert.Contains(t, columns, "status")
|
||||
assert.NotContains(t, columns, "started")
|
||||
assert.NotContains(t, columns, "stopped")
|
||||
})
|
||||
|
||||
t.Run("change started", func(t *testing.T) {
|
||||
// Need the job to be "Running" for started to appear to change
|
||||
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
|
||||
job.Status = StatusRunning
|
||||
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
|
||||
preRun.Started = 0
|
||||
affected, err = db.GetEngine(t.Context()).Cols("started").ID(preRun.ID).Update(preRun)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
run, columns, err := ComputeRunStatus(t.Context(), 791)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, StatusRunning, run.Status)
|
||||
assert.NotEqualValues(t, 0, run.Started)
|
||||
assert.Contains(t, columns, "status")
|
||||
assert.Contains(t, columns, "started")
|
||||
assert.NotContains(t, columns, "stopped")
|
||||
})
|
||||
|
||||
t.Run("won't change stopped if not done", func(t *testing.T) {
|
||||
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
|
||||
job.Status = StatusRunning
|
||||
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
|
||||
preRun.Stopped = 0
|
||||
affected, err = db.GetEngine(t.Context()).Cols("stopped").ID(preRun.ID).Update(preRun)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
run, columns, err := ComputeRunStatus(t.Context(), 791)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, StatusRunning, run.Status)
|
||||
assert.EqualValues(t, 0, run.Stopped)
|
||||
assert.Contains(t, columns, "status")
|
||||
assert.NotContains(t, columns, "stopped")
|
||||
})
|
||||
|
||||
t.Run("change stopped", func(t *testing.T) {
|
||||
// Need the job to be some version of Done for stopped to appear to change
|
||||
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
|
||||
job.Status = StatusSuccess
|
||||
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
|
||||
preRun.Stopped = 0
|
||||
affected, err = db.GetEngine(t.Context()).Cols("stopped").ID(preRun.ID).Update(preRun)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
run, columns, err := ComputeRunStatus(t.Context(), 791)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, StatusSuccess, run.Status)
|
||||
assert.NotEqualValues(t, 0, run.Stopped)
|
||||
assert.NotContains(t, columns, "status")
|
||||
assert.NotContains(t, columns, "started")
|
||||
assert.Contains(t, columns, "stopped")
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -283,3 +283,22 @@
|
|||
need_approval: 0
|
||||
approved_by: 0
|
||||
concurrency_group: abc123
|
||||
-
|
||||
id: 922
|
||||
title: "running workflow_dispatch run"
|
||||
repo_id: 63
|
||||
owner_id: 2
|
||||
workflow_id: "running.yaml"
|
||||
index: 26
|
||||
trigger_user_id: 2
|
||||
ref: "refs/heads/main"
|
||||
commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee"
|
||||
trigger_event: "workflow_dispatch"
|
||||
is_fork_pull_request: 0
|
||||
status: 6 # running
|
||||
started: 1683636528
|
||||
created: 1683636108
|
||||
updated: 1683636626
|
||||
need_approval: 0
|
||||
approved_by: 0
|
||||
concurrency_group: abc123
|
||||
|
|
|
|||
|
|
@ -596,3 +596,44 @@
|
|||
task_id: 107
|
||||
status: 1 # success
|
||||
runs_on: '["fedora"]'
|
||||
|
||||
-
|
||||
id: 642
|
||||
run_id: 922
|
||||
repo_id: 63
|
||||
owner_id: 2
|
||||
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
|
||||
is_fork_pull_request: 0
|
||||
name: job2
|
||||
attempt: 0
|
||||
job_id: job2
|
||||
task_id: 0
|
||||
status: 7 # blocked
|
||||
runs_on: '[]'
|
||||
needs: '["job1"]'
|
||||
workflow_payload: |
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
jobs:
|
||||
job2:
|
||||
runs-on: default
|
||||
strategy:
|
||||
matrix:
|
||||
name: ${{ fromJSON(needs.job1.outputs.empty-list) }}
|
||||
steps:
|
||||
- run: echo "${{ matrix.name }}"
|
||||
incomplete_matrix: true
|
||||
-
|
||||
id: 643
|
||||
run_id: 922
|
||||
repo_id: 63
|
||||
owner_id: 2
|
||||
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
|
||||
is_fork_pull_request: 0
|
||||
name: job1
|
||||
attempt: 0
|
||||
job_id: job1
|
||||
task_id: 110
|
||||
status: 1 # success
|
||||
runs_on: '["fedora"]'
|
||||
|
|
|
|||
|
|
@ -48,3 +48,8 @@
|
|||
task_id: 107
|
||||
output_key: run-on-this
|
||||
output_value: nixos-25.11
|
||||
-
|
||||
id: 112
|
||||
task_id: 110
|
||||
output_key: empty-list
|
||||
output_value: "[]"
|
||||
|
|
|
|||
|
|
@ -273,6 +273,19 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac
|
|||
return fmt.Errorf("unexpected record count in delete incomplete_matrix=true job with ID %d; count = %d", blockedJob.ID, count)
|
||||
}
|
||||
|
||||
// If len(newJobWorkflows) is 0, and blockedJob was the last job in this run, then the job will be complete --
|
||||
// ComputeRunStatus will check for that state.
|
||||
run, columns, err := actions_model.ComputeRunStatus(ctx, blockedJob.RunID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("compute run status: %w", err)
|
||||
}
|
||||
if len(columns) != 0 {
|
||||
err := UpdateRun(ctx, run, columns...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update run: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
|
|
@ -11,6 +12,7 @@ import (
|
|||
"forgejo.org/models/db"
|
||||
"forgejo.org/models/unittest"
|
||||
"forgejo.org/modules/test"
|
||||
notify_service "forgejo.org/services/notify"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v12/act/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -142,6 +144,20 @@ jobs:
|
|||
}
|
||||
}
|
||||
|
||||
type callArgsActionRunNowDone struct {
|
||||
run *actions_model.ActionRun
|
||||
priorStatus actions_model.Status
|
||||
lastRun *actions_model.ActionRun
|
||||
}
|
||||
type mockNotifier struct {
|
||||
notify_service.NullNotifier
|
||||
calls []*callArgsActionRunNowDone
|
||||
}
|
||||
|
||||
func (m *mockNotifier) ActionRunNowDone(ctx context.Context, run *actions_model.ActionRun, priorStatus actions_model.Status, lastRun *actions_model.ActionRun) {
|
||||
m.calls = append(m.calls, &callArgsActionRunNowDone{run, priorStatus, lastRun})
|
||||
}
|
||||
|
||||
func Test_tryHandleIncompleteMatrix(t *testing.T) {
|
||||
// Shouldn't get any decoding errors during this test -- pop them up from a log warning to a test fatal error.
|
||||
defer test.MockVariableValue(&model.OnDecodeNodeError, func(node yaml.Node, out any, err error) {
|
||||
|
|
@ -157,6 +173,7 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
|
|||
preExecutionError actions_model.PreExecutionError
|
||||
preExecutionErrorDetails []any
|
||||
runsOn map[string][]string
|
||||
actionRunStatusChange actions_model.Status
|
||||
}{
|
||||
{
|
||||
name: "not incomplete",
|
||||
|
|
@ -314,12 +331,26 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
|
|||
preExecutionError: actions_model.ErrorCodeIncompleteRunsOnMissingMatrixDimension,
|
||||
preExecutionErrorDetails: []any{"consume-runs-on", "dimension-oops-error"},
|
||||
},
|
||||
{
|
||||
name: "action run completed after expansion",
|
||||
runJobID: 642,
|
||||
consumed: true,
|
||||
runJobNames: []string{
|
||||
"job1",
|
||||
// job2 which expanded into an empty matrix is gone
|
||||
},
|
||||
actionRunStatusChange: actions_model.StatusSuccess,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer unittest.OverrideFixtures("services/actions/Test_tryHandleIncompleteMatrix")()
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
notifier := &mockNotifier{}
|
||||
notify_service.RegisterNotifier(notifier)
|
||||
defer notify_service.UnregisterNotifier(notifier)
|
||||
|
||||
blockedJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: tt.runJobID})
|
||||
|
||||
jobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID})
|
||||
|
|
@ -340,6 +371,15 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
|
|||
// expectations are that the ActionRun has an empty PreExecutionError
|
||||
actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: blockedJob.RunID})
|
||||
assert.EqualValues(t, 0, actionRun.PreExecutionErrorCode)
|
||||
if tt.actionRunStatusChange != 0 {
|
||||
assert.Equal(t, tt.actionRunStatusChange, actionRun.Status)
|
||||
require.Len(t, notifier.calls, 1)
|
||||
call := notifier.calls[0]
|
||||
assert.Equal(t, actionRun.ID, call.run.ID)
|
||||
assert.Nil(t, call.lastRun)
|
||||
assert.Equal(t, actions_model.StatusRunning, call.priorStatus)
|
||||
assert.Equal(t, tt.actionRunStatusChange, call.run.Status)
|
||||
}
|
||||
|
||||
// compare jobs that exist with `runJobNames` to ensure new jobs are inserted:
|
||||
allJobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID})
|
||||
|
|
|
|||
Loading…
Reference in a new issue