From 032b0bbeda9486d8abb16fdfee71bc10a5e3027a Mon Sep 17 00:00:00 2001 From: Mathieu Fenniak Date: Tue, 27 Jan 2026 20:14:04 +0100 Subject: [PATCH] [v14.0/forgejo] fix: empty dynamic matrix can leave action run hanging incomplete (#11072) **Backport:** #11063 Fixes #11030. When a `strategy.matrix` needs to be evaluated on the output of another job, it can become evaluated into an empty set of jobs. In this case, and assuming no other jobs in the run are active, the run should reach a settled state. The logic to check the other jobs in the run and determine if this state has been hit needs to be explicitly added to the job emitter. To accomplish this change, this action run state logic was extracted out of `UpdateRunJobWithoutNotification` where it could be reused. (cherry picked from commit c1980829757a416622c2dd07b809da3b48795de8) Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11072 Reviewed-by: Gusted Co-authored-by: Mathieu Fenniak Co-committed-by: Mathieu Fenniak --- models/actions/run.go | 31 +++++ models/actions/run_job.go | 38 +----- models/actions/run_test.go | 119 ++++++++++++++++++ .../action_run.yml | 19 +++ .../action_run_job.yml | 41 ++++++ .../action_task_output.yml | 5 + services/actions/job_emitter.go | 13 ++ services/actions/job_emitter_test.go | 40 ++++++ 8 files changed, 274 insertions(+), 32 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 1a53fff216..08fd445091 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -500,4 +500,35 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s return nil } +// Compute the Status, Started, and Stopped fields of an ActionRun based upon the current job state within the run. +// Returned is the [ActionRun] with modifications if necessary, a slice of column names that have been updated, or an +// error if the calculation failed. The caller is responsible for then invoking [actions_service.UpdateRun] for an +// update with notifications, or [actions_model.UpdateRunWithoutNotification] if notifications are already handled. +func ComputeRunStatus(ctx context.Context, runID int64) (run *ActionRun, columns []string, err error) { + run, err = GetRunByID(ctx, runID) + if err != nil { + return nil, nil, err + } + jobs, err := GetRunJobsByRunID(ctx, runID) + if err != nil { + return nil, nil, err + } + + newStatus := AggregateJobStatus(jobs) + if run.Status != newStatus { + run.Status = newStatus + columns = append(columns, "status") + } + if run.Started.IsZero() && run.Status.IsRunning() { + run.Started = timeutil.TimeStampNow() + columns = append(columns, "started") + } + if run.Stopped.IsZero() && run.Status.IsDone() { + run.Stopped = timeutil.TimeStampNow() + columns = append(columns, "stopped") + } + + return run, columns, nil +} + type ActionRunIndex db.ResourceIndex diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 181ebed0ef..e2ac1c4250 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -174,38 +174,12 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con } } - { - // Other goroutines may aggregate the status of the run and update it too. - // So we need load the run and its jobs before updating the run. - run, err := GetRunByID(ctx, job.RunID) - if err != nil { - return 0, err - } - jobs, err := GetRunJobsByRunID(ctx, job.RunID) - if err != nil { - return 0, err - } - - updateRequired := false - newStatus := AggregateJobStatus(jobs) - if run.Status != newStatus { - run.Status = newStatus - updateRequired = true - } - if run.Started.IsZero() && run.Status.IsRunning() { - run.Started = timeutil.TimeStampNow() - updateRequired = true - } - if run.Stopped.IsZero() && run.Status.IsDone() { - run.Stopped = timeutil.TimeStampNow() - updateRequired = true - } - if updateRequired { - // As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here. - if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil { - return 0, fmt.Errorf("update run %d: %w", run.ID, err) - } - } + run, columns, err := ComputeRunStatus(ctx, job.RunID) + if err != nil { + return 0, fmt.Errorf("compute run status: %w", err) + } + if err := UpdateRunWithoutNotification(ctx, run, columns...); err != nil { + return 0, fmt.Errorf("update run %d: %w", run.ID, err) } return affected, nil diff --git a/models/actions/run_test.go b/models/actions/run_test.go index 3a169dd56a..064f3fbcea 100644 --- a/models/actions/run_test.go +++ b/models/actions/run_test.go @@ -272,3 +272,122 @@ jobs: // Expect job with an incomplete runs-on to be StatusBlocked: assert.Equal(t, StatusBlocked, job.Status) } + +func TestComputeRunStatus(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + t.Run("no changes", func(t *testing.T) { + run, columns, err := ComputeRunStatus(t.Context(), 791) + require.NoError(t, err) + assert.Equal(t, StatusSuccess, run.Status) + assert.NotContains(t, columns, "status") + assert.EqualValues(t, 1683636528, run.Started) + assert.NotContains(t, columns, "started") + assert.EqualValues(t, 1683636626, run.Stopped) + assert.NotContains(t, columns, "stopped") + }) + + t.Run("change status", func(t *testing.T) { + job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192}) + job.Status = StatusFailure + affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + run, columns, err := ComputeRunStatus(t.Context(), 791) + require.NoError(t, err) + assert.Equal(t, StatusFailure, run.Status) + assert.Contains(t, columns, "status") + assert.NotContains(t, columns, "started") + assert.NotContains(t, columns, "stopped") + }) + + t.Run("won't change started if not running", func(t *testing.T) { + job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192}) + job.Status = StatusBlocked + affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791}) + preRun.Started = 0 + affected, err = db.GetEngine(t.Context()).Cols("started").ID(preRun.ID).Update(preRun) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + run, columns, err := ComputeRunStatus(t.Context(), 791) + require.NoError(t, err) + assert.Equal(t, StatusBlocked, run.Status) + assert.EqualValues(t, 0, run.Started) + assert.Contains(t, columns, "status") + assert.NotContains(t, columns, "started") + assert.NotContains(t, columns, "stopped") + }) + + t.Run("change started", func(t *testing.T) { + // Need the job to be "Running" for started to appear to change + job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192}) + job.Status = StatusRunning + affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791}) + preRun.Started = 0 + affected, err = db.GetEngine(t.Context()).Cols("started").ID(preRun.ID).Update(preRun) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + run, columns, err := ComputeRunStatus(t.Context(), 791) + require.NoError(t, err) + assert.Equal(t, StatusRunning, run.Status) + assert.NotEqualValues(t, 0, run.Started) + assert.Contains(t, columns, "status") + assert.Contains(t, columns, "started") + assert.NotContains(t, columns, "stopped") + }) + + t.Run("won't change stopped if not done", func(t *testing.T) { + job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192}) + job.Status = StatusRunning + affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791}) + preRun.Stopped = 0 + affected, err = db.GetEngine(t.Context()).Cols("stopped").ID(preRun.ID).Update(preRun) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + run, columns, err := ComputeRunStatus(t.Context(), 791) + require.NoError(t, err) + assert.Equal(t, StatusRunning, run.Status) + assert.EqualValues(t, 0, run.Stopped) + assert.Contains(t, columns, "status") + assert.NotContains(t, columns, "stopped") + }) + + t.Run("change stopped", func(t *testing.T) { + // Need the job to be some version of Done for stopped to appear to change + job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192}) + job.Status = StatusSuccess + affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791}) + preRun.Stopped = 0 + affected, err = db.GetEngine(t.Context()).Cols("stopped").ID(preRun.ID).Update(preRun) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + + run, columns, err := ComputeRunStatus(t.Context(), 791) + require.NoError(t, err) + assert.Equal(t, StatusSuccess, run.Status) + assert.NotEqualValues(t, 0, run.Stopped) + assert.NotContains(t, columns, "status") + assert.NotContains(t, columns, "started") + assert.Contains(t, columns, "stopped") + }) +} diff --git a/services/actions/Test_tryHandleIncompleteMatrix/action_run.yml b/services/actions/Test_tryHandleIncompleteMatrix/action_run.yml index 2bbd44b405..ae70fae37e 100644 --- a/services/actions/Test_tryHandleIncompleteMatrix/action_run.yml +++ b/services/actions/Test_tryHandleIncompleteMatrix/action_run.yml @@ -283,3 +283,22 @@ need_approval: 0 approved_by: 0 concurrency_group: abc123 +- + id: 922 + title: "running workflow_dispatch run" + repo_id: 63 + owner_id: 2 + workflow_id: "running.yaml" + index: 26 + trigger_user_id: 2 + ref: "refs/heads/main" + commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee" + trigger_event: "workflow_dispatch" + is_fork_pull_request: 0 + status: 6 # running + started: 1683636528 + created: 1683636108 + updated: 1683636626 + need_approval: 0 + approved_by: 0 + concurrency_group: abc123 diff --git a/services/actions/Test_tryHandleIncompleteMatrix/action_run_job.yml b/services/actions/Test_tryHandleIncompleteMatrix/action_run_job.yml index a352f54ee6..2dfe87b03d 100644 --- a/services/actions/Test_tryHandleIncompleteMatrix/action_run_job.yml +++ b/services/actions/Test_tryHandleIncompleteMatrix/action_run_job.yml @@ -596,3 +596,44 @@ task_id: 107 status: 1 # success runs_on: '["fedora"]' + +- + id: 642 + run_id: 922 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: job2 + attempt: 0 + job_id: job2 + task_id: 0 + status: 7 # blocked + runs_on: '[]' + needs: '["job1"]' + workflow_payload: | + on: + push: + branches: [main] + jobs: + job2: + runs-on: default + strategy: + matrix: + name: ${{ fromJSON(needs.job1.outputs.empty-list) }} + steps: + - run: echo "${{ matrix.name }}" + incomplete_matrix: true +- + id: 643 + run_id: 922 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: job1 + attempt: 0 + job_id: job1 + task_id: 110 + status: 1 # success + runs_on: '["fedora"]' diff --git a/services/actions/Test_tryHandleIncompleteMatrix/action_task_output.yml b/services/actions/Test_tryHandleIncompleteMatrix/action_task_output.yml index 6633ee72ee..424c8ed6ba 100644 --- a/services/actions/Test_tryHandleIncompleteMatrix/action_task_output.yml +++ b/services/actions/Test_tryHandleIncompleteMatrix/action_task_output.yml @@ -48,3 +48,8 @@ task_id: 107 output_key: run-on-this output_value: nixos-25.11 +- + id: 112 + task_id: 110 + output_key: empty-list + output_value: "[]" diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 9484f7aed7..7326369399 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -273,6 +273,19 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac return fmt.Errorf("unexpected record count in delete incomplete_matrix=true job with ID %d; count = %d", blockedJob.ID, count) } + // If len(newJobWorkflows) is 0, and blockedJob was the last job in this run, then the job will be complete -- + // ComputeRunStatus will check for that state. + run, columns, err := actions_model.ComputeRunStatus(ctx, blockedJob.RunID) + if err != nil { + return fmt.Errorf("compute run status: %w", err) + } + if len(columns) != 0 { + err := UpdateRun(ctx, run, columns...) + if err != nil { + return fmt.Errorf("update run: %w", err) + } + } + return nil }) if err != nil { diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 0125e924d1..5ae7965473 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -4,6 +4,7 @@ package actions import ( + "context" "slices" "testing" @@ -11,6 +12,7 @@ import ( "forgejo.org/models/db" "forgejo.org/models/unittest" "forgejo.org/modules/test" + notify_service "forgejo.org/services/notify" "code.forgejo.org/forgejo/runner/v12/act/model" "github.com/stretchr/testify/assert" @@ -142,6 +144,20 @@ jobs: } } +type callArgsActionRunNowDone struct { + run *actions_model.ActionRun + priorStatus actions_model.Status + lastRun *actions_model.ActionRun +} +type mockNotifier struct { + notify_service.NullNotifier + calls []*callArgsActionRunNowDone +} + +func (m *mockNotifier) ActionRunNowDone(ctx context.Context, run *actions_model.ActionRun, priorStatus actions_model.Status, lastRun *actions_model.ActionRun) { + m.calls = append(m.calls, &callArgsActionRunNowDone{run, priorStatus, lastRun}) +} + func Test_tryHandleIncompleteMatrix(t *testing.T) { // Shouldn't get any decoding errors during this test -- pop them up from a log warning to a test fatal error. defer test.MockVariableValue(&model.OnDecodeNodeError, func(node yaml.Node, out any, err error) { @@ -157,6 +173,7 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { preExecutionError actions_model.PreExecutionError preExecutionErrorDetails []any runsOn map[string][]string + actionRunStatusChange actions_model.Status }{ { name: "not incomplete", @@ -314,12 +331,26 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { preExecutionError: actions_model.ErrorCodeIncompleteRunsOnMissingMatrixDimension, preExecutionErrorDetails: []any{"consume-runs-on", "dimension-oops-error"}, }, + { + name: "action run completed after expansion", + runJobID: 642, + consumed: true, + runJobNames: []string{ + "job1", + // job2 which expanded into an empty matrix is gone + }, + actionRunStatusChange: actions_model.StatusSuccess, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer unittest.OverrideFixtures("services/actions/Test_tryHandleIncompleteMatrix")() require.NoError(t, unittest.PrepareTestDatabase()) + notifier := &mockNotifier{} + notify_service.RegisterNotifier(notifier) + defer notify_service.UnregisterNotifier(notifier) + blockedJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: tt.runJobID}) jobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID}) @@ -340,6 +371,15 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { // expectations are that the ActionRun has an empty PreExecutionError actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: blockedJob.RunID}) assert.EqualValues(t, 0, actionRun.PreExecutionErrorCode) + if tt.actionRunStatusChange != 0 { + assert.Equal(t, tt.actionRunStatusChange, actionRun.Status) + require.Len(t, notifier.calls, 1) + call := notifier.calls[0] + assert.Equal(t, actionRun.ID, call.run.ID) + assert.Nil(t, call.lastRun) + assert.Equal(t, actions_model.StatusRunning, call.priorStatus) + assert.Equal(t, tt.actionRunStatusChange, call.run.Status) + } // compare jobs that exist with `runJobNames` to ensure new jobs are inserted: allJobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID})