diff --git a/models/actions/task_list.go b/models/actions/task_list.go index fe4c028c2c..712eada378 100644 --- a/models/actions/task_list.go +++ b/models/actions/task_list.go @@ -8,6 +8,7 @@ import ( "forgejo.org/models/db" "forgejo.org/modules/container" + "forgejo.org/modules/optional" "forgejo.org/modules/timeutil" "xorm.io/builder" @@ -54,6 +55,8 @@ type FindTaskOptions struct { UpdatedBefore timeutil.TimeStamp StartedBefore timeutil.TimeStamp RunnerID int64 + LogExpired optional.Option[bool] + LogInStorage optional.Option[bool] } func (opts FindTaskOptions) ToConds() builder.Cond { @@ -79,6 +82,12 @@ func (opts FindTaskOptions) ToConds() builder.Cond { if opts.RunnerID > 0 { cond = cond.And(builder.Eq{"runner_id": opts.RunnerID}) } + if opts.LogExpired.Has() { + cond = cond.And(builder.Eq{"log_expired": opts.LogExpired.Value()}) + } + if opts.LogInStorage.Has() { + cond = cond.And(builder.Eq{"log_in_storage": opts.LogInStorage.Value()}) + } return cond } diff --git a/models/dbfs/dbfile.go b/models/dbfs/dbfile.go index 12c0398abc..8cd64177dd 100644 --- a/models/dbfs/dbfile.go +++ b/models/dbfs/dbfile.go @@ -38,7 +38,7 @@ type file struct { var _ File = (*file)(nil) -func (f *file) readAt(fileMeta *dbfsMeta, offset int64, p []byte) (n int, err error) { +func (f *file) readAt(fileMeta *DbfsMeta, offset int64, p []byte) (n int, err error) { if offset >= fileMeta.FileSize { return 0, io.EOF } @@ -56,7 +56,7 @@ func (f *file) readAt(fileMeta *dbfsMeta, offset int64, p []byte) (n int, err er if needRead <= 0 { return 0, io.EOF } - var fileData dbfsData + var fileData DbfsData ok, err := db.GetEngine(f.ctx).Where("meta_id = ? AND blob_offset = ?", f.metaID, blobOffset).Get(&fileData) if err != nil { return 0, err @@ -129,7 +129,7 @@ func (f *file) Write(p []byte) (n int, err error) { buf = buf[:readBytes] } - fileData := dbfsData{ + fileData := DbfsData{ MetaID: fileMeta.ID, BlobOffset: blobOffset, BlobData: buf, @@ -152,7 +152,7 @@ func (f *file) Write(p []byte) (n int, err error) { p = p[needWrite:] } - fileMetaUpdate := dbfsMeta{ + fileMetaUpdate := DbfsMeta{ ModifyTimestamp: timeToFileTimestamp(time.Now()), } if needUpdateSize { @@ -216,7 +216,7 @@ func fileTimestampToTime(timestamp int64) time.Time { } func (f *file) loadMetaByPath() error { - var fileMeta dbfsMeta + var fileMeta DbfsMeta if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { return err } else if ok { @@ -278,7 +278,7 @@ func (f *file) createEmpty() error { return os.ErrExist } now := time.Now() - _, err := db.GetEngine(f.ctx).Insert(&dbfsMeta{ + _, err := db.GetEngine(f.ctx).Insert(&DbfsMeta{ FullPath: f.fullPath, BlockSize: f.blockSize, CreateTimestamp: timeToFileTimestamp(now), @@ -298,7 +298,7 @@ func (f *file) truncate() error { if _, err := db.GetEngine(ctx).Exec("UPDATE dbfs_meta SET file_size = 0 WHERE id = ?", f.metaID); err != nil { return err } - if _, err := db.GetEngine(ctx).Delete(&dbfsData{MetaID: f.metaID}); err != nil { + if _, err := db.GetEngine(ctx).Delete(&DbfsData{MetaID: f.metaID}); err != nil { return err } return nil @@ -323,10 +323,10 @@ func (f *file) delete() error { return os.ErrNotExist } return db.WithTx(f.ctx, func(ctx context.Context) error { - if _, err := db.GetEngine(ctx).Delete(&dbfsMeta{ID: f.metaID}); err != nil { + if _, err := db.GetEngine(ctx).Delete(&DbfsMeta{ID: f.metaID}); err != nil { return err } - if _, err := db.GetEngine(ctx).Delete(&dbfsData{MetaID: f.metaID}); err != nil { + if _, err := db.GetEngine(ctx).Delete(&DbfsData{MetaID: f.metaID}); err != nil { return err } return nil @@ -344,8 +344,8 @@ func (f *file) size() (int64, error) { return fileMeta.FileSize, nil } -func findFileMetaByID(ctx context.Context, metaID int64) (*dbfsMeta, error) { - var fileMeta dbfsMeta +func findFileMetaByID(ctx context.Context, metaID int64) (*DbfsMeta, error) { + var fileMeta DbfsMeta if ok, err := db.GetEngine(ctx).Where("id = ?", metaID).Get(&fileMeta); err != nil { return nil, err } else if ok { diff --git a/models/dbfs/dbfs.go b/models/dbfs/dbfs.go index ba57e50151..b370930faa 100644 --- a/models/dbfs/dbfs.go +++ b/models/dbfs/dbfs.go @@ -42,7 +42,7 @@ The DBFS solution: The seeking and finding is not the fastest way, but it's still acceptable and won't affect the performance too much. */ -type dbfsMeta struct { +type DbfsMeta struct { //revive:disable-line:exported ID int64 `xorm:"pk autoincr"` FullPath string `xorm:"VARCHAR(500) UNIQUE NOT NULL"` BlockSize int64 `xorm:"BIGINT NOT NULL"` @@ -51,7 +51,7 @@ type dbfsMeta struct { ModifyTimestamp int64 `xorm:"BIGINT NOT NULL"` } -type dbfsData struct { +type DbfsData struct { //revive:disable-line:exported ID int64 `xorm:"pk autoincr"` Revision int64 `xorm:"BIGINT NOT NULL"` MetaID int64 `xorm:"BIGINT index(meta_offset) NOT NULL"` @@ -61,8 +61,8 @@ type dbfsData struct { } func init() { - db.RegisterModel(new(dbfsMeta)) - db.RegisterModel(new(dbfsData)) + db.RegisterModel(new(DbfsMeta)) + db.RegisterModel(new(DbfsData)) } func OpenFile(ctx context.Context, name string, flag int) (File, error) { @@ -104,28 +104,28 @@ func Remove(ctx context.Context, name string) error { return f.delete() } -var _ fs.FileInfo = (*dbfsMeta)(nil) +var _ fs.FileInfo = (*DbfsMeta)(nil) -func (m *dbfsMeta) Name() string { +func (m *DbfsMeta) Name() string { return path.Base(m.FullPath) } -func (m *dbfsMeta) Size() int64 { +func (m *DbfsMeta) Size() int64 { return m.FileSize } -func (m *dbfsMeta) Mode() fs.FileMode { +func (m *DbfsMeta) Mode() fs.FileMode { return os.ModePerm } -func (m *dbfsMeta) ModTime() time.Time { +func (m *DbfsMeta) ModTime() time.Time { return fileTimestampToTime(m.ModifyTimestamp) } -func (m *dbfsMeta) IsDir() bool { +func (m *DbfsMeta) IsDir() bool { return false } -func (m *dbfsMeta) Sys() any { +func (m *DbfsMeta) Sys() any { return nil } diff --git a/modules/actions/log.go b/modules/actions/log.go index cda40b26dc..5df1f9bec3 100644 --- a/modules/actions/log.go +++ b/modules/actions/log.go @@ -6,6 +6,7 @@ package actions import ( "bufio" "context" + "errors" "fmt" "io" "os" @@ -29,6 +30,19 @@ const ( defaultBufSize = MaxLineSize ) +func ExistsLogs(ctx context.Context, filename string) (bool, error) { + name := DBFSPrefix + filename + f, err := dbfs.Open(ctx, name) + if err == nil { + f.Close() + return true, nil + } + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err +} + // WriteLogs appends logs to DBFS file for temporary storage. // It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content. // Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage. @@ -164,6 +178,9 @@ func RemoveLogs(ctx context.Context, inStorage bool, filename string) error { name := DBFSPrefix + filename err := dbfs.Remove(ctx, name) if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } return fmt.Errorf("dbfs remove %q: %w", name, err) } return nil diff --git a/options/locale_next/locale_en-US.json b/options/locale_next/locale_en-US.json index 7d559f0414..3469671fe3 100644 --- a/options/locale_next/locale_en-US.json +++ b/options/locale_next/locale_en-US.json @@ -152,6 +152,7 @@ "admin.dashboard.cleanup_offline_runners": "Cleanup offline runners", "admin.dashboard.remove_resolved_reports": "Remove resolved reports", "admin.dashboard.actions_action_user": "Revoke Forgejo Actions trust for inactive users", + "admin.dashboard.transfer_lingering_logs": "Transfer actions logs of finished actions jobs from the database to storage", "admin.config.security": "Security configuration", "admin.config.global_2fa_requirement.title": "Global two-factor requirement", "admin.config.global_2fa_requirement.none": "No", diff --git a/services/actions/TestServicesActions_TransferLingeringLogs/action_task.yml b/services/actions/TestServicesActions_TransferLingeringLogs/action_task.yml new file mode 100644 index 0000000000..aab10191a6 --- /dev/null +++ b/services/actions/TestServicesActions_TransferLingeringLogs/action_task.yml @@ -0,0 +1,80 @@ +# all entries will be used with now() == 2024-12-01 +- + id: 1000 # lingering log + attempt: 3 + runner_id: 1 + status: 3 # cancelled + repo_id: 4 + owner_id: 1 + commit_sha: "1000" + is_fork_pull_request: 0 + token_hash: "1000" + log_filename: path1 + log_in_storage: false + log_expired: 0 + created: 1732575600 # date +%s --date 2024-11-26 + updated: 1732575605 # a few seconds later + +- + id: 2000 # lingering log too new to be garbage collected + attempt: 3 + runner_id: 1 + status: 3 # cancelled + repo_id: 4 + owner_id: 1 + commit_sha: "2000" + is_fork_pull_request: 0 + token_hash: "2000" + log_filename: path2 + log_in_storage: false + log_expired: 0 + created: 1732921200 # date +%s --date 2024-11-30 + updated: 1732921205 # a few seconds later + +- + id: 3000 # log already in storage + attempt: 3 + runner_id: 1 + status: 3 # cancelled + repo_id: 4 + owner_id: 1 + commit_sha: "3000" + is_fork_pull_request: 0 + token_hash: "3000" + log_filename: path3 + log_in_storage: true + log_expired: 0 + created: 1732575600 # date +%s --date 2024-11-26 + updated: 1732575605 # a few seconds later + +- + id: 4000 # lingering log + attempt: 3 + runner_id: 1 + status: 3 # cancelled + repo_id: 4 + owner_id: 1 + commit_sha: "4000" + is_fork_pull_request: 0 + token_hash: "4000" + log_filename: path4 + log_in_storage: false + log_expired: 0 + created: 1732575600 # date +%s --date 2024-11-26 + updated: 1732575605 # a few seconds later + +- + id: 5000 # lingering log + attempt: 3 + runner_id: 1 + status: 3 # cancelled + repo_id: 4 + owner_id: 1 + commit_sha: "5000" + is_fork_pull_request: 0 + token_hash: "5000" + log_filename: path5 + log_in_storage: false + log_expired: 0 + created: 1732575600 # date +%s --date 2024-11-26 + updated: 1732575605 # a few seconds later diff --git a/services/actions/TestServicesActions_TransferLingeringLogs/dbfs_data.yml b/services/actions/TestServicesActions_TransferLingeringLogs/dbfs_data.yml new file mode 100644 index 0000000000..227dd50f5d --- /dev/null +++ b/services/actions/TestServicesActions_TransferLingeringLogs/dbfs_data.yml @@ -0,0 +1,21 @@ +- + id: 1 + revision: 1 + meta_id: 1 + blob_offset: 5 + blob_size: 5 + blob_data: "12345" +- + id: 4 + revision: 1 + meta_id: 4 + blob_offset: 5 + blob_size: 5 + blob_data: "12345" +- + id: 5 + revision: 1 + meta_id: 5 + blob_offset: 5 + blob_size: 5 + blob_data: "12345" diff --git a/services/actions/TestServicesActions_TransferLingeringLogs/dbfs_meta.yml b/services/actions/TestServicesActions_TransferLingeringLogs/dbfs_meta.yml new file mode 100644 index 0000000000..ea1355df7c --- /dev/null +++ b/services/actions/TestServicesActions_TransferLingeringLogs/dbfs_meta.yml @@ -0,0 +1,22 @@ +# all entries will be used with now() == 2024-12-01 +- + id: 1 + full_path: 1:actions_log/path1 + block_size: 4096 + file_size: 5 + create_timestamp: 1732057200 # 2024-11-20 + modify_timestamp: 1732057205 # a few seconds after create_timestamp +- + id: 4 + full_path: 1:actions_log/path4 + block_size: 4096 + file_size: 5 + create_timestamp: 1732057200 # 2024-11-20 + modify_timestamp: 1732057205 # a few seconds after create_timestamp +- + id: 5 + full_path: 1:actions_log/path5 + block_size: 4096 + file_size: 5 + create_timestamp: 1732057200 # 2024-11-20 + modify_timestamp: 1732057205 # a few seconds after create_timestamp diff --git a/services/actions/log.go b/services/actions/log.go new file mode 100644 index 0000000000..a4ed22e85a --- /dev/null +++ b/services/actions/log.go @@ -0,0 +1,96 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package actions + +import ( + "context" + "fmt" + "time" + + actions_model "forgejo.org/models/actions" + "forgejo.org/models/db" + "forgejo.org/modules/actions" + "forgejo.org/modules/log" + "forgejo.org/modules/optional" + "forgejo.org/modules/timeutil" +) + +var ( + transferLingeringLogsMax = 3000 + transferLingeringLogsSleep = 1 * time.Second + transferLingeringLogsOld = 24 * time.Hour +) + +func TransferLingeringLogs(ctx context.Context) error { + return transferLingeringLogs(ctx, transferLingeringLogsOpts(time.Now())) +} + +func transferLingeringLogsOpts(now time.Time) actions_model.FindTaskOptions { + // performance considerations: the search is linear because + // LogInStorage has no index. But it is bounded by + // LogExpired which is always true for older records and has an index. + return actions_model.FindTaskOptions{ + Status: actions_model.DoneStatuses(), + LogInStorage: optional.Some(false), + LogExpired: optional.Some(false), + // do it after a long delay to avoid any possibility of race with an ongoing operation + // as it is not protected by a transaction + UpdatedBefore: timeutil.TimeStamp(now.Add(-transferLingeringLogsOld).Unix()), + } +} + +func transferLingeringLogs(ctx context.Context, opts actions_model.FindTaskOptions) error { + count := 0 + err := db.Iterate(ctx, opts.ToConds(), func(ctx context.Context, task *actions_model.ActionTask) error { + if err := TransferLogsAndUpdateLogInStorage(ctx, task); err != nil { + return err + } + log.Debug("processed task %d", task.ID) + count++ + if count < transferLingeringLogsMax { + log.Debug("sleeping %v to not stress the storage", transferLingeringLogsSleep) + time.Sleep(transferLingeringLogsSleep) + } + if count >= transferLingeringLogsMax { + return fmt.Errorf("stopped after processing %v tasks and will resume later", transferLingeringLogsMax) + } + return nil + }) + if count >= transferLingeringLogsMax { + log.Info("%v", err) + return nil + } + if count > 0 { + log.Info("processed %d tasks", count) + } + return err +} + +func TransferLogsAndUpdateLogInStorage(ctx context.Context, task *actions_model.ActionTask) error { + if task.LogInStorage { + return nil + } + remove, err := TransferLogs(ctx, task.LogFilename) + if err != nil { + return err + } + task.LogInStorage = true + if err := actions_model.UpdateTask(ctx, task, "log_in_storage"); err != nil { + return err + } + remove() + + return nil +} + +func TransferLogs(ctx context.Context, logFilename string) (func(), error) { + exists, err := actions.ExistsLogs(ctx, logFilename) + if err != nil { + return nil, err + } + if !exists { + return func() {}, nil + } + return actions.TransferLogs(ctx, logFilename) +} diff --git a/services/actions/log_test.go b/services/actions/log_test.go new file mode 100644 index 0000000000..c6debac5c0 --- /dev/null +++ b/services/actions/log_test.go @@ -0,0 +1,74 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package actions + +import ( + "testing" + "time" + + actions_model "forgejo.org/models/actions" + dbfs_model "forgejo.org/models/dbfs" + "forgejo.org/models/unittest" + "forgejo.org/modules/test" + "forgejo.org/modules/timeutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "xorm.io/builder" +) + +func TestServicesActions_transferLingeringLogs(t *testing.T) { + // it would be easier to dynamically create fixtures instead of injecting them + // in the database for testing, but the dbfs API does not have what is needed to + // create them + defer unittest.OverrideFixtures("services/actions/TestServicesActions_TransferLingeringLogs")() + require.NoError(t, unittest.PrepareTestDatabase()) + defer test.MockVariableValue(&transferLingeringLogsMax, 2)() + defer test.MockVariableValue(&transferLingeringLogsOld, 2*24*time.Hour)() + defer test.MockVariableValue(&transferLingeringLogsSleep, time.Millisecond)() + + now, err := time.Parse("2006-01-02", "2024-12-01") + require.NoError(t, err) + old := timeutil.TimeStamp(now.Add(-transferLingeringLogsOld).Unix()) + + // a task has a lingering log but was updated more recently than + // transferLingeringLogsOld + recentID := int64(2000) + recent := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: recentID}, builder.Eq{"log_in_storage": false}) + require.Greater(t, recent.Updated, old) + + // a task has logs already in storage but would be garbage collected if it was not + inStorageID := int64(3000) + inStorage := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: inStorageID}, builder.Eq{"log_in_storage": true}) + require.Greater(t, old, inStorage.Updated) + + taskWithLingeringLogIDs := []int64{1000, 4000, 5000} + for _, taskWithLingeringLogID := range taskWithLingeringLogIDs { + lingeringLog := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskWithLingeringLogID}, builder.Eq{"log_in_storage": false}) + require.Greater(t, old, lingeringLog.Updated) + } + lingeringLogIDs := []int64{1, 4, 5} + + assert.True(t, unittest.BeanExists(t, &dbfs_model.DbfsMeta{}, builder.In("id", []any{lingeringLogIDs}...))) + + // first pass transfer logs for transferLingeringLogsMax tasks + require.NoError(t, transferLingeringLogs(t.Context(), transferLingeringLogsOpts(now))) + assert.True(t, unittest.BeanExists(t, &dbfs_model.DbfsMeta{}, builder.In("id", []any{lingeringLogIDs[transferLingeringLogsMax:]}...))) + for _, lingeringLogID := range lingeringLogIDs[:transferLingeringLogsMax] { + unittest.AssertNotExistsBean(t, &dbfs_model.DbfsMeta{ID: lingeringLogID}) + } + + // second pass transfer logs for the remainder tasks and there are none left + require.NoError(t, transferLingeringLogs(t.Context(), transferLingeringLogsOpts(now))) + for _, lingeringLogID := range lingeringLogIDs { + unittest.AssertNotExistsBean(t, &dbfs_model.DbfsMeta{ID: lingeringLogID}) + } + + // third pass is happilly doing nothing + require.NoError(t, transferLingeringLogs(t.Context(), transferLingeringLogsOpts(now))) + + // verify the tasks that are not to be garbage collected are still present + assert.True(t, unittest.BeanExists(t, &actions_model.ActionTask{ID: recentID}, builder.Eq{"log_in_storage": false})) + assert.True(t, unittest.BeanExists(t, &actions_model.ActionTask{ID: inStorageID}, builder.Eq{"log_in_storage": true})) +} diff --git a/services/cron/tasks_actions.go b/services/cron/tasks_actions.go index 25c16bdd8d..eceb19cea8 100644 --- a/services/cron/tasks_actions.go +++ b/services/cron/tasks_actions.go @@ -19,6 +19,7 @@ func initActionsTasks() { registerStopZombieTasks() registerStopEndlessTasks() registerCancelAbandonedJobs() + registerTransferLingeringLogs() registerScheduleTasks() registerActionsCleanup() registerOfflineRunnersCleanup() @@ -55,6 +56,16 @@ func registerCancelAbandonedJobs() { }) } +func registerTransferLingeringLogs() { + RegisterTaskFatal("transfer_lingering_logs", &BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@midnight", + }, func(ctx context.Context, _ *user_model.User, cfg Config) error { + return actions_service.TransferLingeringLogs(ctx) + }) +} + // registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks. func registerScheduleTasks() { // Register the task with a unique name, enabled status, and schedule for every minute. diff --git a/tests/integration/api_admin_test.go b/tests/integration/api_admin_test.go index b4faf9d775..e83630c919 100644 --- a/tests/integration/api_admin_test.go +++ b/tests/integration/api_admin_test.go @@ -357,11 +357,11 @@ func TestAPICron(t *testing.T) { AddTokenAuth(token) resp := MakeRequest(t, req, http.StatusOK) - assert.Equal(t, "30", resp.Header().Get("X-Total-Count")) + assert.Equal(t, "31", resp.Header().Get("X-Total-Count")) var crons []api.Cron DecodeJSON(t, resp, &crons) - assert.Len(t, crons, 30) + assert.Len(t, crons, 31) }) t.Run("Execute", func(t *testing.T) {