refactor: decouple resource migrators from accessor (#117743)

* refactor: decouple migrators

* refactor: decouple

* refactor: add shorturl

* fix: tests

* fix: remove count resources
This commit is contained in:
Mustafa Sencer Özcan 2026-02-11 12:23:22 +01:00 committed by GitHub
parent ec4cc4afb2
commit c03d2bd0c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
34 changed files with 648 additions and 989 deletions

View file

@ -1,194 +0,0 @@
// Code generated by mockery v2.53.4. DO NOT EDIT.
package legacy
import (
context "context"
resourcepb "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
mock "github.com/stretchr/testify/mock"
)
// MockMigrationDashboardAccessor is an autogenerated mock type for the MigrationDashboardAccessor type
type MockMigrationDashboardAccessor struct {
mock.Mock
}
type MockMigrationDashboardAccessor_Expecter struct {
mock *mock.Mock
}
func (_m *MockMigrationDashboardAccessor) EXPECT() *MockMigrationDashboardAccessor_Expecter {
return &MockMigrationDashboardAccessor_Expecter{mock: &_m.Mock}
}
// CountResources provides a mock function with given fields: ctx, opts
func (_m *MockMigrationDashboardAccessor) CountResources(ctx context.Context, opts MigrateOptions) (*resourcepb.BulkResponse, error) {
ret := _m.Called(ctx, opts)
if len(ret) == 0 {
panic("no return value specified for CountResources")
}
var r0 *resourcepb.BulkResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, MigrateOptions) (*resourcepb.BulkResponse, error)); ok {
return rf(ctx, opts)
}
if rf, ok := ret.Get(0).(func(context.Context, MigrateOptions) *resourcepb.BulkResponse); ok {
r0 = rf(ctx, opts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*resourcepb.BulkResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, MigrateOptions) error); ok {
r1 = rf(ctx, opts)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockMigrationDashboardAccessor_CountResources_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CountResources'
type MockMigrationDashboardAccessor_CountResources_Call struct {
*mock.Call
}
// CountResources is a helper method to define mock.On call
// - ctx context.Context
// - opts MigrateOptions
func (_e *MockMigrationDashboardAccessor_Expecter) CountResources(ctx interface{}, opts interface{}) *MockMigrationDashboardAccessor_CountResources_Call {
return &MockMigrationDashboardAccessor_CountResources_Call{Call: _e.mock.On("CountResources", ctx, opts)}
}
func (_c *MockMigrationDashboardAccessor_CountResources_Call) Run(run func(ctx context.Context, opts MigrateOptions)) *MockMigrationDashboardAccessor_CountResources_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(MigrateOptions))
})
return _c
}
func (_c *MockMigrationDashboardAccessor_CountResources_Call) Return(_a0 *resourcepb.BulkResponse, _a1 error) *MockMigrationDashboardAccessor_CountResources_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockMigrationDashboardAccessor_CountResources_Call) RunAndReturn(run func(context.Context, MigrateOptions) (*resourcepb.BulkResponse, error)) *MockMigrationDashboardAccessor_CountResources_Call {
_c.Call.Return(run)
return _c
}
// MigrateDashboards provides a mock function with given fields: ctx, orgId, opts, stream
func (_m *MockMigrationDashboardAccessor) MigrateDashboards(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
ret := _m.Called(ctx, orgId, opts, stream)
if len(ret) == 0 {
panic("no return value specified for MigrateDashboards")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error); ok {
r0 = rf(ctx, orgId, opts, stream)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockMigrationDashboardAccessor_MigrateDashboards_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MigrateDashboards'
type MockMigrationDashboardAccessor_MigrateDashboards_Call struct {
*mock.Call
}
// MigrateDashboards is a helper method to define mock.On call
// - ctx context.Context
// - orgId int64
// - opts MigrateOptions
// - stream resourcepb.BulkStore_BulkProcessClient
func (_e *MockMigrationDashboardAccessor_Expecter) MigrateDashboards(ctx interface{}, orgId interface{}, opts interface{}, stream interface{}) *MockMigrationDashboardAccessor_MigrateDashboards_Call {
return &MockMigrationDashboardAccessor_MigrateDashboards_Call{Call: _e.mock.On("MigrateDashboards", ctx, orgId, opts, stream)}
}
func (_c *MockMigrationDashboardAccessor_MigrateDashboards_Call) Run(run func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient)) *MockMigrationDashboardAccessor_MigrateDashboards_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(MigrateOptions), args[3].(resourcepb.BulkStore_BulkProcessClient))
})
return _c
}
func (_c *MockMigrationDashboardAccessor_MigrateDashboards_Call) Return(_a0 error) *MockMigrationDashboardAccessor_MigrateDashboards_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMigrationDashboardAccessor_MigrateDashboards_Call) RunAndReturn(run func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error) *MockMigrationDashboardAccessor_MigrateDashboards_Call {
_c.Call.Return(run)
return _c
}
// MigrateFolders provides a mock function with given fields: ctx, orgId, opts, stream
func (_m *MockMigrationDashboardAccessor) MigrateFolders(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
ret := _m.Called(ctx, orgId, opts, stream)
if len(ret) == 0 {
panic("no return value specified for MigrateFolders")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error); ok {
r0 = rf(ctx, orgId, opts, stream)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockMigrationDashboardAccessor_MigrateFolders_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MigrateFolders'
type MockMigrationDashboardAccessor_MigrateFolders_Call struct {
*mock.Call
}
// MigrateFolders is a helper method to define mock.On call
// - ctx context.Context
// - orgId int64
// - opts MigrateOptions
// - stream resourcepb.BulkStore_BulkProcessClient
func (_e *MockMigrationDashboardAccessor_Expecter) MigrateFolders(ctx interface{}, orgId interface{}, opts interface{}, stream interface{}) *MockMigrationDashboardAccessor_MigrateFolders_Call {
return &MockMigrationDashboardAccessor_MigrateFolders_Call{Call: _e.mock.On("MigrateFolders", ctx, orgId, opts, stream)}
}
func (_c *MockMigrationDashboardAccessor_MigrateFolders_Call) Run(run func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient)) *MockMigrationDashboardAccessor_MigrateFolders_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(MigrateOptions), args[3].(resourcepb.BulkStore_BulkProcessClient))
})
return _c
}
func (_c *MockMigrationDashboardAccessor_MigrateFolders_Call) Return(_a0 error) *MockMigrationDashboardAccessor_MigrateFolders_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMigrationDashboardAccessor_MigrateFolders_Call) RunAndReturn(run func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error) *MockMigrationDashboardAccessor_MigrateFolders_Call {
_c.Call.Return(run)
return _c
}
// NewMockMigrationDashboardAccessor creates a new instance of MockMigrationDashboardAccessor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockMigrationDashboardAccessor(t interface {
mock.TestingT
Cleanup(func())
}) *MockMigrationDashboardAccessor {
mock := &MockMigrationDashboardAccessor{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -1,86 +0,0 @@
// Code generated by mockery v2.53.4. DO NOT EDIT.
package legacy
import (
context "context"
resourcepb "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
mock "github.com/stretchr/testify/mock"
)
// MockPlaylistMigrator is an autogenerated mock type for the PlaylistMigrator type
type MockPlaylistMigrator struct {
mock.Mock
}
type MockPlaylistMigrator_Expecter struct {
mock *mock.Mock
}
func (_m *MockPlaylistMigrator) EXPECT() *MockPlaylistMigrator_Expecter {
return &MockPlaylistMigrator_Expecter{mock: &_m.Mock}
}
// MigratePlaylists provides a mock function with given fields: ctx, orgId, opts, stream
func (_m *MockPlaylistMigrator) MigratePlaylists(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
ret := _m.Called(ctx, orgId, opts, stream)
if len(ret) == 0 {
panic("no return value specified for MigratePlaylists")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error); ok {
r0 = rf(ctx, orgId, opts, stream)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockPlaylistMigrator_MigratePlaylists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MigratePlaylists'
type MockPlaylistMigrator_MigratePlaylists_Call struct {
*mock.Call
}
// MigratePlaylists is a helper method to define mock.On call
// - ctx context.Context
// - orgId int64
// - opts MigrateOptions
// - stream resourcepb.BulkStore_BulkProcessClient
func (_e *MockPlaylistMigrator_Expecter) MigratePlaylists(ctx interface{}, orgId interface{}, opts interface{}, stream interface{}) *MockPlaylistMigrator_MigratePlaylists_Call {
return &MockPlaylistMigrator_MigratePlaylists_Call{Call: _e.mock.On("MigratePlaylists", ctx, orgId, opts, stream)}
}
func (_c *MockPlaylistMigrator_MigratePlaylists_Call) Run(run func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient)) *MockPlaylistMigrator_MigratePlaylists_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(MigrateOptions), args[3].(resourcepb.BulkStore_BulkProcessClient))
})
return _c
}
func (_c *MockPlaylistMigrator_MigratePlaylists_Call) Return(_a0 error) *MockPlaylistMigrator_MigratePlaylists_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockPlaylistMigrator_MigratePlaylists_Call) RunAndReturn(run func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error) *MockPlaylistMigrator_MigratePlaylists_Call {
_c.Call.Return(run)
return _c
}
// NewMockPlaylistMigrator creates a new instance of MockPlaylistMigrator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockPlaylistMigrator(t interface {
mock.TestingT
Cleanup(func())
}) *MockPlaylistMigrator {
mock := &MockPlaylistMigrator{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -28,8 +28,6 @@ func mustTemplate(filename string) *template.Template {
var (
sqlQueryDashboards = mustTemplate("query_dashboards.sql")
sqlQueryPanels = mustTemplate("query_panels.sql")
sqlQueryPlaylists = mustTemplate("query_playlists.sql")
sqlQueryShortURLs = mustTemplate("query_shorturls.sql")
)
type sqlQuery struct {
@ -85,53 +83,3 @@ func newLibraryQueryReq(sql *legacysql.LegacyDatabaseHelper, query *LibraryPanel
UserTable: sql.Table("user"),
}
}
type PlaylistQuery struct {
OrgID int64
}
type sqlPlaylistQuery struct {
sqltemplate.SQLTemplate
Query *PlaylistQuery
PlaylistTable string
PlaylistItemTable string
}
func (r sqlPlaylistQuery) Validate() error {
return nil
}
func newPlaylistQueryReq(sql *legacysql.LegacyDatabaseHelper, query *PlaylistQuery) sqlPlaylistQuery {
return sqlPlaylistQuery{
SQLTemplate: sqltemplate.New(sql.DialectForDriver()),
Query: query,
PlaylistTable: sql.Table("playlist"),
PlaylistItemTable: sql.Table("playlist_item"),
}
}
type ShortURLQuery struct {
OrgID int64
}
type sqlShortURLQuery struct {
sqltemplate.SQLTemplate
Query *ShortURLQuery
ShortURLTable string
}
func (r sqlShortURLQuery) Validate() error {
return nil
}
func newShortURLQueryReq(sql *legacysql.LegacyDatabaseHelper, query *ShortURLQuery) sqlShortURLQuery {
return sqlShortURLQuery{
SQLTemplate: sqltemplate.New(sql.DialectForDriver()),
Query: query,
ShortURLTable: sql.Table("short_url"),
}
}

View file

@ -29,18 +29,6 @@ func TestDashboardQueries(t *testing.T) {
return &v
}
getPlaylistQuery := func(q *PlaylistQuery) sqltemplate.SQLTemplate {
v := newPlaylistQueryReq(nodb, q)
v.SQLTemplate = mocks.NewTestingSQLTemplate()
return &v
}
getShortURLQuery := func(q *ShortURLQuery) sqltemplate.SQLTemplate {
v := newShortURLQueryReq(nodb, q)
v.SQLTemplate = mocks.NewTestingSQLTemplate()
return &v
}
mocks.CheckQuerySnapshots(t, mocks.TemplateTestSetup{
RootDir: "testdata",
SQLTemplatesFS: sqlTemplatesFS,
@ -166,22 +154,6 @@ func TestDashboardQueries(t *testing.T) {
}),
},
},
sqlQueryPlaylists: {
{
Name: "list",
Data: getPlaylistQuery(&PlaylistQuery{
OrgID: 1,
}),
},
},
sqlQueryShortURLs: {
{
Name: "list",
Data: getShortURLQuery(&ShortURLQuery{
OrgID: 1,
}),
},
},
},
})
}

View file

@ -1,86 +0,0 @@
// Code generated by mockery v2.53.4. DO NOT EDIT.
package legacy
import (
context "context"
resourcepb "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
mock "github.com/stretchr/testify/mock"
)
// MockShortURLMigrator is an autogenerated mock type for the ShortURLMigrator type
type MockShortURLMigrator struct {
mock.Mock
}
type MockShortURLMigrator_Expecter struct {
mock *mock.Mock
}
func (_m *MockShortURLMigrator) EXPECT() *MockShortURLMigrator_Expecter {
return &MockShortURLMigrator_Expecter{mock: &_m.Mock}
}
// MigrateShortURLs provides a mock function with given fields: ctx, orgId, opts, stream
func (_m *MockShortURLMigrator) MigrateShortURLs(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
ret := _m.Called(ctx, orgId, opts, stream)
if len(ret) == 0 {
panic("no return value specified for MigrateShortURLs")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error); ok {
r0 = rf(ctx, orgId, opts, stream)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockShortURLMigrator_MigrateShortURLs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MigrateShortURLs'
type MockShortURLMigrator_MigrateShortURLs_Call struct {
*mock.Call
}
// MigrateShortURLs is a helper method to define mock.On call
// - ctx context.Context
// - orgId int64
// - opts MigrateOptions
// - stream resourcepb.BulkStore_BulkProcessClient
func (_e *MockShortURLMigrator_Expecter) MigrateShortURLs(ctx interface{}, orgId interface{}, opts interface{}, stream interface{}) *MockShortURLMigrator_MigrateShortURLs_Call {
return &MockShortURLMigrator_MigrateShortURLs_Call{Call: _e.mock.On("MigrateShortURLs", ctx, orgId, opts, stream)}
}
func (_c *MockShortURLMigrator_MigrateShortURLs_Call) Run(run func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient)) *MockShortURLMigrator_MigrateShortURLs_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(MigrateOptions), args[3].(resourcepb.BulkStore_BulkProcessClient))
})
return _c
}
func (_c *MockShortURLMigrator_MigrateShortURLs_Call) Return(_a0 error) *MockShortURLMigrator_MigrateShortURLs_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockShortURLMigrator_MigrateShortURLs_Call) RunAndReturn(run func(context.Context, int64, MigrateOptions, resourcepb.BulkStore_BulkProcessClient) error) *MockShortURLMigrator_MigrateShortURLs_Call {
_c.Call.Return(run)
return _c
}
// NewMockShortURLMigrator creates a new instance of MockShortURLMigrator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockShortURLMigrator(t interface {
mock.TestingT
Cleanup(func())
}) *MockShortURLMigrator {
mock := &MockShortURLMigrator{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/otel"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/ptr"
claims "github.com/grafana/authlib/types"
@ -23,9 +22,6 @@ import (
dashboardV0 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
dashboardV1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
"github.com/grafana/grafana/apps/dashboard/pkg/migration/schemaversion"
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
playlistv1 "github.com/grafana/grafana/apps/playlist/pkg/apis/playlist/v1"
shorturlv1beta1 "github.com/grafana/grafana/apps/shorturl/pkg/apis/shorturl/v1beta1"
"github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
@ -43,6 +39,7 @@ import (
"github.com/grafana/grafana/pkg/services/search/sort"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
@ -52,14 +49,6 @@ var (
tracer = otel.Tracer("github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy")
)
type MigrateOptions struct {
Namespace string
Resources []schema.GroupResource
WithHistory bool // only applies to dashboards
OnlyCount bool // just count the values
Progress func(count int, msg string)
}
type dashboardRow struct {
// The numeric version for this dashboard
RV int64
@ -94,39 +83,15 @@ type dashboardSqlAccess struct {
log log.Logger
}
// ProvideMigratorDashboardAccessor creates a DashboardAccess specifically for migration purposes.
// This provider is used by Wire DI and only includes the minimal dependencies needed for migrations.
func ProvideMigratorDashboardAccessor(
func ProvideMigrator(
sql legacysql.LegacyDatabaseProvider,
provisioning provisioning.StubProvisioningService,
accessControl accesscontrol.AccessControl,
) MigrationDashboardAccessor {
return newMigratorAccess(sql, provisioning, accessControl)
) Migrator {
return NewMigratorAccess(sql, provisioning, accessControl)
}
// ProvidePlaylistMigrator creates a PlaylistMigrator for migration purposes.
// This is wired separately from MigrationDashboardAccessor so that playlist
// migrations are decoupled from the dashboard accessor interface.
func ProvidePlaylistMigrator(
sql legacysql.LegacyDatabaseProvider,
provisioning provisioning.StubProvisioningService,
accessControl accesscontrol.AccessControl,
) PlaylistMigrator {
return newMigratorAccess(sql, provisioning, accessControl)
}
// ProvideShortURLMigrator creates a ShortURLMigrator for migration purposes.
// This is wired separately from MigrationDashboardAccessor so that short URL
// migrations are decoupled from the dashboard accessor interface.
func ProvideShortURLMigrator(
sql legacysql.LegacyDatabaseProvider,
provisioning provisioning.StubProvisioningService,
accessControl accesscontrol.AccessControl,
) ShortURLMigrator {
return newMigratorAccess(sql, provisioning, accessControl)
}
func newMigratorAccess(
func NewMigratorAccess(
sql legacysql.LegacyDatabaseProvider,
provisioning provisioning.StubProvisioningService,
accessControl accesscontrol.AccessControl,
@ -232,75 +197,8 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, helper *legacysql.Lega
}, err
}
// CountResources counts resources without migrating them
func (a *dashboardSqlAccess) CountResources(ctx context.Context, opts MigrateOptions) (*resourcepb.BulkResponse, error) {
sql, err := a.sql(ctx)
if err != nil {
return nil, err
}
ns, err := claims.ParseNamespace(opts.Namespace)
if err != nil {
return nil, err
}
orgId := ns.OrgID
rsp := &resourcepb.BulkResponse{}
err = sql.DB.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
for _, res := range opts.Resources {
switch fmt.Sprintf("%s/%s", res.Group, res.Resource) {
case "folder.grafana.app/folders":
summary := &resourcepb.BulkResponse_Summary{}
summary.Group = folders.GROUP
summary.Resource = folders.RESOURCE
_, err = sess.SQL("SELECT COUNT(*) FROM "+sql.Table("dashboard")+
" WHERE is_folder=TRUE AND org_id=?", orgId).Get(&summary.Count)
rsp.Summary = append(rsp.Summary, summary)
case "dashboard.grafana.app/librarypanels":
summary := &resourcepb.BulkResponse_Summary{}
summary.Group = dashboardV1.GROUP
summary.Resource = dashboardV1.LIBRARY_PANEL_RESOURCE
_, err = sess.SQL("SELECT COUNT(*) FROM "+sql.Table("library_element")+
" WHERE org_id=?", orgId).Get(&summary.Count)
rsp.Summary = append(rsp.Summary, summary)
case "dashboard.grafana.app/dashboards":
summary := &resourcepb.BulkResponse_Summary{}
summary.Group = dashboardV1.GROUP
summary.Resource = dashboardV1.DASHBOARD_RESOURCE
rsp.Summary = append(rsp.Summary, summary)
_, err = sess.SQL("SELECT COUNT(*) FROM "+sql.Table("dashboard")+
" WHERE is_folder=FALSE AND org_id=?", orgId).Get(&summary.Count)
if err != nil {
return err
}
// Also count history
_, err = sess.SQL(`SELECT COUNT(*)
FROM `+sql.Table("dashboard_version")+` as dv
JOIN `+sql.Table("dashboard")+` as dd
ON dd.id = dv.dashboard_id
WHERE org_id=?`, orgId).Get(&summary.History)
case "shorturl.grafana.app/shorturls":
summary := &resourcepb.BulkResponse_Summary{}
summary.Group = shorturlv1beta1.APIGroup
summary.Resource = "shorturls"
_, err = sess.SQL("SELECT COUNT(*) FROM "+sql.Table("short_url")+
" WHERE org_id=?", orgId).Get(&summary.Count)
rsp.Summary = append(rsp.Summary, summary)
}
if err != nil {
return err
}
}
return nil
})
return rsp, nil
}
// MigrateDashboards handles the dashboard migration logic
func (a *dashboardSqlAccess) MigrateDashboards(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
func (a *dashboardSqlAccess) MigrateDashboards(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
query := &DashboardQuery{
OrgID: orgId,
Limit: 100000000,
@ -392,7 +290,7 @@ func (a *dashboardSqlAccess) MigrateDashboards(ctx context.Context, orgId int64,
}
// MigrateFolders handles the folder migration logic
func (a *dashboardSqlAccess) MigrateFolders(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
func (a *dashboardSqlAccess) MigrateFolders(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
query := &DashboardQuery{
OrgID: orgId,
Limit: 100000000,
@ -475,7 +373,7 @@ func (a *dashboardSqlAccess) MigrateFolders(ctx context.Context, orgId int64, op
}
// MigrateLibraryPanels handles the library panel migration logic
func (a *dashboardSqlAccess) MigrateLibraryPanels(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
func (a *dashboardSqlAccess) MigrateLibraryPanels(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
opts.Progress(-1, "migrating library panels...")
panels, err := a.GetLibraryPanels(ctx, LibraryPanelQuery{
OrgID: orgId,
@ -523,137 +421,6 @@ func (a *dashboardSqlAccess) MigrateLibraryPanels(ctx context.Context, orgId int
return nil
}
// MigratePlaylists handles the playlist migration logic
func (a *dashboardSqlAccess) MigratePlaylists(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
opts.Progress(-1, "migrating playlists...")
rows, err := a.ListPlaylists(ctx, orgId)
if rows != nil {
defer func() {
_ = rows.Close()
}()
}
if err != nil {
return err
}
// Group playlist items by playlist ID while preserving order
type playlistData struct {
id int64
uid string
name string
interval string
items []playlistv1.PlaylistItem
createdAt int64
updatedAt int64
}
playlistIndex := make(map[int64]int) // maps playlist ID to index in playlists slice
playlists := []*playlistData{}
var currentID int64
var orgID int64
var uid, name, interval string
var createdAt, updatedAt int64
var itemType, itemValue sql.NullString
count := 0
for rows.Next() {
err = rows.Scan(&currentID, &orgID, &uid, &name, &interval, &createdAt, &updatedAt, &itemType, &itemValue)
if err != nil {
return err
}
// Get or create playlist entry
idx, exists := playlistIndex[currentID]
var pl *playlistData
if !exists {
pl = &playlistData{
id: currentID,
uid: uid,
name: name,
interval: interval,
items: []playlistv1.PlaylistItem{},
createdAt: createdAt,
updatedAt: updatedAt,
}
playlistIndex[currentID] = len(playlists)
playlists = append(playlists, pl)
} else {
pl = playlists[idx]
}
// Add item if it exists (LEFT JOIN can return NULL for playlists without items)
if itemType.Valid && itemValue.Valid {
pl.items = append(pl.items, playlistv1.PlaylistItem{
Type: playlistv1.PlaylistPlaylistItemType(itemType.String),
Value: itemValue.String,
})
}
}
if err = rows.Err(); err != nil {
return err
}
// Convert to K8s objects and send to stream (order is preserved)
for _, pl := range playlists {
playlist := &playlistv1.Playlist{
TypeMeta: metav1.TypeMeta{
APIVersion: playlistv1.GroupVersion.String(),
Kind: "Playlist",
},
ObjectMeta: metav1.ObjectMeta{
Name: pl.uid,
Namespace: opts.Namespace,
CreationTimestamp: metav1.NewTime(time.UnixMilli(pl.createdAt)),
},
Spec: playlistv1.PlaylistSpec{
Title: pl.name,
Interval: pl.interval,
Items: pl.items,
},
}
// Set updated timestamp if different from created
if pl.updatedAt != pl.createdAt {
meta, err := utils.MetaAccessor(playlist)
if err != nil {
return err
}
updatedTime := time.UnixMilli(pl.updatedAt)
meta.SetUpdatedTimestamp(&updatedTime)
}
body, err := json.Marshal(playlist)
if err != nil {
return err
}
req := &resourcepb.BulkRequest{
Key: &resourcepb.ResourceKey{
Namespace: opts.Namespace,
Group: "playlist.grafana.app",
Resource: "playlists",
Name: pl.uid,
},
Value: body,
Action: resourcepb.BulkRequest_ADDED,
}
opts.Progress(count, fmt.Sprintf("%s (%d)", pl.name, len(req.Value)))
count++
err = stream.Send(req)
if err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
return err
}
}
opts.Progress(-2, fmt.Sprintf("finished playlists... (%d)", len(playlists)))
return nil
}
var _ resource.ListIterator = (*rowsWrapper)(nil)
type rowsWrapper struct {
@ -1348,140 +1115,3 @@ func parseLibraryPanelRow(p panel) (dashboardV0.LibraryPanel, error) {
func (b *dashboardSqlAccess) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (a *dashboardSqlAccess) ListPlaylists(ctx context.Context, orgID int64) (*sql.Rows, error) {
ctx, span := tracer.Start(ctx, "legacy.dashboardSqlAccess.ListPlaylists")
defer span.End()
helper, err := a.sql(ctx)
if err != nil {
return nil, err
}
req := newPlaylistQueryReq(helper, &PlaylistQuery{
OrgID: orgID,
})
rawQuery, err := sqltemplate.Execute(sqlQueryPlaylists, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlQueryPlaylists.Name(), err)
}
rows, err := a.executeQuery(ctx, helper, rawQuery, req.GetArgs()...)
if err != nil && rows != nil {
_ = rows.Close()
return nil, err
}
return rows, err
}
func (a *dashboardSqlAccess) ListShortURLs(ctx context.Context, orgID int64) (*sql.Rows, error) {
ctx, span := tracer.Start(ctx, "legacy.dashboardSqlAccess.ListShortURLs")
defer span.End()
helper, err := a.sql(ctx)
if err != nil {
return nil, err
}
req := newShortURLQueryReq(helper, &ShortURLQuery{
OrgID: orgID,
})
rawQuery, err := sqltemplate.Execute(sqlQueryShortURLs, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlQueryShortURLs.Name(), err)
}
rows, err := a.executeQuery(ctx, helper, rawQuery, req.GetArgs()...)
if err != nil && rows != nil {
_ = rows.Close()
return nil, err
}
return rows, err
}
// MigrateShortURLs handles the short URL migration logic
func (a *dashboardSqlAccess) MigrateShortURLs(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
opts.Progress(-1, "migrating short URLs...")
rows, err := a.ListShortURLs(ctx, orgId)
if rows != nil {
defer func() {
_ = rows.Close()
}()
}
if err != nil {
return err
}
var id int64
var orgID int64
var uid, path string
var createdBy int64
var createdAt int64
var lastSeenAt int64
count := 0
for rows.Next() {
err = rows.Scan(&id, &orgID, &uid, &path, &createdBy, &createdAt, &lastSeenAt)
if err != nil {
return err
}
shortURL := &shorturlv1beta1.ShortURL{
TypeMeta: metav1.TypeMeta{
APIVersion: shorturlv1beta1.GroupVersion.String(),
Kind: "ShortURL",
},
ObjectMeta: metav1.ObjectMeta{
Name: uid,
Namespace: opts.Namespace,
CreationTimestamp: metav1.NewTime(time.Unix(createdAt, 0)),
},
Spec: shorturlv1beta1.ShortURLSpec{
Path: path,
},
Status: shorturlv1beta1.ShortURLStatus{
LastSeenAt: lastSeenAt,
},
}
if createdBy > 0 {
shortURL.SetCreatedBy(claims.NewTypeID(claims.TypeUser, strconv.FormatInt(createdBy, 10)))
}
body, err := json.Marshal(shortURL)
if err != nil {
return err
}
req := &resourcepb.BulkRequest{
Key: &resourcepb.ResourceKey{
Namespace: opts.Namespace,
Group: shorturlv1beta1.APIGroup,
Resource: "shorturls",
Name: uid,
},
Value: body,
Action: resourcepb.BulkRequest_ADDED,
}
opts.Progress(count, fmt.Sprintf("%s (%d)", uid, len(req.Value)))
count++
err = stream.Send(req)
if err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
return err
}
}
if err = rows.Err(); err != nil {
return err
}
opts.Progress(-2, fmt.Sprintf("finished short URLs... (%d)", count))
return nil
}

View file

@ -22,6 +22,7 @@ import (
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks"
)
@ -637,7 +638,7 @@ func TestMigrateDashboardsConfiguration(t *testing.T) {
t.Run("Migration options should configure query correctly", func(t *testing.T) {
// Test the migration configuration as used in real migration
opts := MigrateOptions{
opts := migrations.MigrateOptions{
WithHistory: true, // Migration includes history
}

View file

@ -5,6 +5,7 @@ import (
dashboardV0 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
dashboardV1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@ -72,19 +73,8 @@ type DashboardAccessor interface {
GetLibraryPanels(ctx context.Context, query LibraryPanelQuery) (*dashboardV0.LibraryPanelList, error)
}
//go:generate mockery --name MigrationDashboardAccessor --structname MockMigrationDashboardAccessor --inpackage --filename migration_dashboard_accessor_mock.go --with-expecter
type MigrationDashboardAccessor interface {
CountResources(ctx context.Context, opts MigrateOptions) (*resourcepb.BulkResponse, error)
MigrateDashboards(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
MigrateFolders(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
}
//go:generate mockery --name PlaylistMigrator --structname MockPlaylistMigrator --inpackage --filename playlist_migrator_mock.go --with-expecter
type PlaylistMigrator interface {
MigratePlaylists(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
}
//go:generate mockery --name ShortURLMigrator --structname MockShortURLMigrator --inpackage --filename shorturl_migrator_mock.go --with-expecter
type ShortURLMigrator interface {
MigrateShortURLs(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
type Migrator interface {
MigrateDashboards(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
MigrateFolders(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
MigrateLibraryPanels(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
}

View file

@ -3,17 +3,12 @@ package dashboard
import (
v1beta1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/migrator"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"k8s.io/apimachinery/pkg/runtime/schema"
)
/*
FoldersDashboardsMigration returns the migration definition for folders and dashboards.
This is owned by the dashboard team and uses the MigrationDashboardAccessor
to stream folder and dashboard resources from legacy SQL storage.
*/
func FoldersDashboardsMigration(accessor legacy.MigrationDashboardAccessor) migrations.MigrationDefinition {
func FoldersDashboardsMigration(migrator migrator.FoldersDashboardsMigrator) migrations.MigrationDefinition {
folderGR := schema.GroupResource{Group: folders.GROUP, Resource: folders.RESOURCE}
dashboardGR := schema.GroupResource{Group: v1beta1.GROUP, Resource: v1beta1.DASHBOARD_RESOURCE}
@ -25,8 +20,8 @@ func FoldersDashboardsMigration(accessor legacy.MigrationDashboardAccessor) migr
{GroupResource: dashboardGR, LockTable: "dashboard"},
},
Migrators: map[schema.GroupResource]migrations.MigratorFunc{
folderGR: accessor.MigrateFolders,
dashboardGR: accessor.MigrateDashboards,
folderGR: migrator.MigrateFolders,
dashboardGR: migrator.MigrateDashboards,
},
Validators: []migrations.ValidatorFactory{
migrations.CountValidation(folderGR, "dashboard", "org_id = ? AND is_folder = true AND deleted IS NULL"),

View file

@ -0,0 +1,47 @@
package migrator
import (
"context"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
type FoldersDashboardsMigrator interface {
MigrateDashboards(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
MigrateFolders(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
}
// foldersDashboardsMigrator handles migrating dashboards, folders, and library panels
// from legacy SQL storage.
type foldersDashboardsMigrator struct {
migrator legacy.Migrator
}
// ProvideFoldersDashboardsMigrator creates a foldersDashboardsMigrator for use in wire DI.
func ProvideFoldersDashboardsMigrator(
migrator legacy.Migrator,
) FoldersDashboardsMigrator {
return &foldersDashboardsMigrator{
migrator: migrator,
}
}
// MigrateDashboards reads dashboards from legacy SQL storage and streams them
// to the unified storage bulk process API.
func (m *foldersDashboardsMigrator) MigrateDashboards(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return m.migrator.MigrateDashboards(ctx, orgId, opts, stream)
}
// MigrateFolders reads folders from legacy SQL storage and streams them
// to the unified storage bulk process API.
func (m *foldersDashboardsMigrator) MigrateFolders(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return m.migrator.MigrateFolders(ctx, orgId, opts, stream)
}
// MigrateLibraryPanels reads library panels from legacy SQL storage and streams them
// to the unified storage bulk process API.
func (m *foldersDashboardsMigrator) MigrateLibraryPanels(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return m.migrator.MigrateLibraryPanels(ctx, orgId, opts, stream)
}

View file

@ -44,7 +44,6 @@ import (
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/controller"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
deletepkg "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/delete"
@ -115,7 +114,6 @@ type APIBuilder struct {
jobHistoryConfig *JobHistoryConfig
jobHistoryLoki *jobs.LokiJobHistory
resourceLister resources.ResourceLister
dashboardAccess legacy.MigrationDashboardAccessor
unified resource.ResourceClient
repoFactory repository.Factory
connectionFactory connection.Factory
@ -146,7 +144,6 @@ func NewAPIBuilder(
features featuremgmt.FeatureToggles,
unified resource.ResourceClient,
configProvider apiserver.RestConfigProvider,
dashboardAccess legacy.MigrationDashboardAccessor,
storageStatus dualwrite.Service,
usageStats usagestats.Service,
access authlib.AccessChecker,
@ -193,7 +190,6 @@ func NewAPIBuilder(
parsers: parsers,
repositoryResources: resources.NewRepositoryResourcesFactory(parsers, clients, resourceLister),
resourceLister: resourceLister,
dashboardAccess: dashboardAccess,
unified: unified,
access: accessChecker,
accessWithAdmin: accessChecker.WithFallbackRole(identity.RoleAdmin),
@ -268,7 +264,6 @@ func RegisterAPIService(
client resource.ResourceClient, // implements resource.RepositoryClient
configProvider apiserver.RestConfigProvider,
access authlib.AccessClient,
dashboardAccess legacy.MigrationDashboardAccessor,
storageStatus dualwrite.Service,
usageStats usagestats.Service,
tracer tracing.Tracer,
@ -303,7 +298,7 @@ func RegisterAPIService(
features,
client,
configProvider,
dashboardAccess, storageStatus,
storageStatus,
usageStats,
access,
tracer,

View file

@ -2,17 +2,12 @@ package playlist
import (
playlists "github.com/grafana/grafana/apps/playlist/pkg/apis/playlist/v1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
migrator "github.com/grafana/grafana/pkg/registry/apps/playlist/migrator"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"k8s.io/apimachinery/pkg/runtime/schema"
)
/*
PlaylistMigration returns the migration definition for playlists.
It lives in the playlist package so the playlist team owns their migration
definition, decoupled from the dashboard accessor.
*/
func PlaylistMigration(migrator legacy.PlaylistMigrator) migrations.MigrationDefinition {
func PlaylistMigration(migrator migrator.PlaylistMigrator) migrations.MigrationDefinition {
playlistGR := schema.GroupResource{Group: playlists.APIGroup, Resource: "playlists"}
return migrations.MigrationDefinition{

View file

@ -0,0 +1,219 @@
package playlist
import (
"context"
"database/sql"
"embed"
"encoding/json"
"errors"
"fmt"
"io"
"text/template"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
playlistv1 "github.com/grafana/grafana/apps/playlist/pkg/apis/playlist/v1"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
//go:embed query_playlists.sql
var playlistSQLTemplatesFS embed.FS
var sqlQueryPlaylists = template.Must(
template.New("sql").ParseFS(playlistSQLTemplatesFS, "query_playlists.sql"),
).Lookup("query_playlists.sql")
type PlaylistMigrator interface {
MigratePlaylists(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
}
// playlistMigrator handles migrating playlists from legacy SQL storage.
type playlistMigrator struct {
sql legacysql.LegacyDatabaseProvider
}
// ProvidePlaylistMigrator creates a playlistMigrator for use in wire DI.
func ProvidePlaylistMigrator(sql legacysql.LegacyDatabaseProvider) PlaylistMigrator {
return &playlistMigrator{sql: sql}
}
// MigratePlaylists reads playlists from legacy SQL storage and streams them as
// Kubernetes resources to the unified storage bulk process API.
func (m *playlistMigrator) MigratePlaylists(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
opts.Progress(-1, "migrating playlists...")
rows, err := m.listPlaylists(ctx, orgId)
if rows != nil {
defer func() {
_ = rows.Close()
}()
}
if err != nil {
return err
}
// Group playlist items by playlist ID while preserving order
type playlistData struct {
id int64
uid string
name string
interval string
items []playlistv1.PlaylistItem
createdAt int64
updatedAt int64
}
playlistIndex := make(map[int64]int) // maps playlist ID to index in playlists slice
playlists := []*playlistData{}
var currentID int64
var orgID int64
var uid, name, interval string
var createdAt, updatedAt int64
var itemType, itemValue sql.NullString
count := 0
for rows.Next() {
err = rows.Scan(&currentID, &orgID, &uid, &name, &interval, &createdAt, &updatedAt, &itemType, &itemValue)
if err != nil {
return err
}
// Get or create playlist entry
idx, exists := playlistIndex[currentID]
var pl *playlistData
if !exists {
pl = &playlistData{
id: currentID,
uid: uid,
name: name,
interval: interval,
items: []playlistv1.PlaylistItem{},
createdAt: createdAt,
updatedAt: updatedAt,
}
playlistIndex[currentID] = len(playlists)
playlists = append(playlists, pl)
} else {
pl = playlists[idx]
}
// Add item if it exists (LEFT JOIN can return NULL for playlists without items)
if itemType.Valid && itemValue.Valid {
pl.items = append(pl.items, playlistv1.PlaylistItem{
Type: playlistv1.PlaylistPlaylistItemType(itemType.String),
Value: itemValue.String,
})
}
}
if err = rows.Err(); err != nil {
return err
}
// Convert to K8s objects and send to stream (order is preserved)
for _, pl := range playlists {
playlist := &playlistv1.Playlist{
TypeMeta: metav1.TypeMeta{
APIVersion: playlistv1.GroupVersion.String(),
Kind: "Playlist",
},
ObjectMeta: metav1.ObjectMeta{
Name: pl.uid,
Namespace: opts.Namespace,
CreationTimestamp: metav1.NewTime(time.UnixMilli(pl.createdAt)),
},
Spec: playlistv1.PlaylistSpec{
Title: pl.name,
Interval: pl.interval,
Items: pl.items,
},
}
// Set updated timestamp if different from created
if pl.updatedAt != pl.createdAt {
meta, err := utils.MetaAccessor(playlist)
if err != nil {
return err
}
updatedTime := time.UnixMilli(pl.updatedAt)
meta.SetUpdatedTimestamp(&updatedTime)
}
body, err := json.Marshal(playlist)
if err != nil {
return err
}
req := &resourcepb.BulkRequest{
Key: &resourcepb.ResourceKey{
Namespace: opts.Namespace,
Group: "playlist.grafana.app",
Resource: "playlists",
Name: pl.uid,
},
Value: body,
Action: resourcepb.BulkRequest_ADDED,
}
opts.Progress(count, fmt.Sprintf("%s (%d)", pl.name, len(req.Value)))
count++
err = stream.Send(req)
if err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
return err
}
}
opts.Progress(-2, fmt.Sprintf("finished playlists... (%d)", len(playlists)))
return nil
}
type playlistQuery struct {
OrgID int64
}
type sqlPlaylistQuery struct {
sqltemplate.SQLTemplate
Query *playlistQuery
PlaylistTable string
PlaylistItemTable string
}
func (r sqlPlaylistQuery) Validate() error {
return nil
}
func newPlaylistQueryReq(sql *legacysql.LegacyDatabaseHelper, query *playlistQuery) sqlPlaylistQuery {
return sqlPlaylistQuery{
SQLTemplate: sqltemplate.New(sql.DialectForDriver()),
Query: query,
PlaylistTable: sql.Table("playlist"),
PlaylistItemTable: sql.Table("playlist_item"),
}
}
func (m *playlistMigrator) listPlaylists(ctx context.Context, orgID int64) (*sql.Rows, error) {
helper, err := m.sql(ctx)
if err != nil {
return nil, err
}
req := newPlaylistQueryReq(helper, &playlistQuery{
OrgID: orgID,
})
rawQuery, err := sqltemplate.Execute(sqlQueryPlaylists, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlQueryPlaylists.Name(), err)
}
return helper.DB.GetSqlxSession().Query(ctx, rawQuery, req.GetArgs()...)
}

View file

@ -0,0 +1,40 @@
package playlist
import (
"testing"
"text/template"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks"
)
func TestPlaylistQueries(t *testing.T) {
// prefix tables with grafana
nodb := &legacysql.LegacyDatabaseHelper{
Table: func(n string) string {
return "grafana." + n
},
}
getPlaylistQuery := func(q *playlistQuery) sqltemplate.SQLTemplate {
v := newPlaylistQueryReq(nodb, q)
v.SQLTemplate = mocks.NewTestingSQLTemplate()
return &v
}
mocks.CheckQuerySnapshots(t, mocks.TemplateTestSetup{
RootDir: "testdata",
SQLTemplatesFS: playlistSQLTemplatesFS,
Templates: map[*template.Template][]mocks.TemplateTestCase{
sqlQueryPlaylists: {
{
Name: "list",
Data: getPlaylistQuery(&playlistQuery{
OrgID: 1,
}),
},
},
},
})
}

View file

@ -2,17 +2,12 @@ package shorturl
import (
shorturl "github.com/grafana/grafana/apps/shorturl/pkg/apis/shorturl/v1beta1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apps/shorturl/migrator"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"k8s.io/apimachinery/pkg/runtime/schema"
)
/*
ShortURLMigration returns the migration definition for shorturls.
It lives in the shorturl package so the shorturl team owns their migration
definition, decoupled from the dashboard accessor.
*/
func ShortURLMigration(migrator legacy.ShortURLMigrator) migrations.MigrationDefinition {
func ShortURLMigration(migrator migrator.ShortURLMigrator) migrations.MigrationDefinition {
shortURLGR := schema.GroupResource{Group: shorturl.APIGroup, Resource: "shorturls"}
return migrations.MigrationDefinition{

View file

@ -0,0 +1,170 @@
package migrator
import (
"context"
"database/sql"
"embed"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"text/template"
"time"
claims "github.com/grafana/authlib/types"
shorturlv1beta1 "github.com/grafana/grafana/apps/shorturl/pkg/apis/shorturl/v1beta1"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//go:embed query_shorturls.sql
var shortURLSQLTemplatesFS embed.FS
var sqlQueryShortURLs = template.Must(
template.New("sql").ParseFS(shortURLSQLTemplatesFS, "query_shorturls.sql"),
).Lookup("query_shorturls.sql")
type ShortURLMigrator interface {
MigrateShortURLs(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
}
// shortURLMigrator handles migrating short URLs from legacy SQL storage.
type shortURLMigrator struct {
sql legacysql.LegacyDatabaseProvider
}
// ProvideShortURLMigrator creates a shortURLMigrator for use in wire DI.
func ProvideShortURLMigrator(sql legacysql.LegacyDatabaseProvider) ShortURLMigrator {
return &shortURLMigrator{sql: sql}
}
// MigrateShortURLs handles the short URL migration logic
func (m *shortURLMigrator) MigrateShortURLs(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
opts.Progress(-1, "migrating short URLs...")
rows, err := m.ListShortURLs(ctx, orgId)
if rows != nil {
defer func() {
_ = rows.Close()
}()
}
if err != nil {
return err
}
var id int64
var orgID int64
var uid, path string
var createdBy int64
var createdAt int64
var lastSeenAt int64
count := 0
for rows.Next() {
err = rows.Scan(&id, &orgID, &uid, &path, &createdBy, &createdAt, &lastSeenAt)
if err != nil {
return err
}
shortURL := &shorturlv1beta1.ShortURL{
TypeMeta: metav1.TypeMeta{
APIVersion: shorturlv1beta1.GroupVersion.String(),
Kind: "ShortURL",
},
ObjectMeta: metav1.ObjectMeta{
Name: uid,
Namespace: opts.Namespace,
CreationTimestamp: metav1.NewTime(time.Unix(createdAt, 0)),
},
Spec: shorturlv1beta1.ShortURLSpec{
Path: path,
},
Status: shorturlv1beta1.ShortURLStatus{
LastSeenAt: lastSeenAt,
},
}
if createdBy > 0 {
shortURL.SetCreatedBy(claims.NewTypeID(claims.TypeUser, strconv.FormatInt(createdBy, 10)))
}
body, err := json.Marshal(shortURL)
if err != nil {
return err
}
req := &resourcepb.BulkRequest{
Key: &resourcepb.ResourceKey{
Namespace: opts.Namespace,
Group: shorturlv1beta1.APIGroup,
Resource: "shorturls",
Name: uid,
},
Value: body,
Action: resourcepb.BulkRequest_ADDED,
}
opts.Progress(count, fmt.Sprintf("%s (%d)", uid, len(req.Value)))
count++
err = stream.Send(req)
if err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
return err
}
}
if err = rows.Err(); err != nil {
return err
}
opts.Progress(-2, fmt.Sprintf("finished short URLs... (%d)", count))
return nil
}
func (m *shortURLMigrator) ListShortURLs(ctx context.Context, orgID int64) (*sql.Rows, error) {
helper, err := m.sql(ctx)
if err != nil {
return nil, err
}
req := newShortURLQueryReq(helper, &ShortURLQuery{
OrgID: orgID,
})
rawQuery, err := sqltemplate.Execute(sqlQueryShortURLs, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlQueryShortURLs.Name(), err)
}
return helper.DB.GetSqlxSession().Query(ctx, rawQuery, req.GetArgs()...)
}
type ShortURLQuery struct {
OrgID int64
}
type sqlShortURLQuery struct {
sqltemplate.SQLTemplate
Query *ShortURLQuery
ShortURLTable string
}
func (r sqlShortURLQuery) Validate() error {
return nil
}
func newShortURLQueryReq(sql *legacysql.LegacyDatabaseHelper, query *ShortURLQuery) sqlShortURLQuery {
return sqlShortURLQuery{
SQLTemplate: sqltemplate.New(sql.DialectForDriver()),
Query: query,
ShortURLTable: sql.Table("short_url"),
}
}

View file

@ -0,0 +1,40 @@
package migrator
import (
"testing"
"text/template"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks"
)
func TestShortURLQueries(t *testing.T) {
// prefix tables with grafana
nodb := &legacysql.LegacyDatabaseHelper{
Table: func(n string) string {
return "grafana." + n
},
}
getShortURLQuery := func(q *ShortURLQuery) sqltemplate.SQLTemplate {
v := newShortURLQueryReq(nodb, q)
v.SQLTemplate = mocks.NewTestingSQLTemplate()
return &v
}
mocks.CheckQuerySnapshots(t, mocks.TemplateTestSetup{
RootDir: "testdata",
SQLTemplatesFS: shortURLSQLTemplatesFS,
Templates: map[*template.Template][]mocks.TemplateTestCase{
sqlQueryShortURLs: {
{
Name: "list",
Data: getShortURLQuery(&ShortURLQuery{
OrgID: 1,
}),
},
},
},
})
}

View file

@ -11,4 +11,4 @@ FROM
WHERE
s.org_id = {{ .Arg .Query.OrgID }}
ORDER BY
s.id ASC
s.id ASC

View file

@ -43,7 +43,8 @@ import (
"github.com/grafana/grafana/pkg/middleware/loggermw"
apiregistry "github.com/grafana/grafana/pkg/registry/apis"
dashboardmigration "github.com/grafana/grafana/pkg/registry/apis/dashboard"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
dashboardlegacy "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
dashboardmigrator "github.com/grafana/grafana/pkg/registry/apis/dashboard/migrator"
secretclock "github.com/grafana/grafana/pkg/registry/apis/secret/clock"
secretcontracts "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
secretdecrypt "github.com/grafana/grafana/pkg/registry/apis/secret/decrypt"
@ -56,7 +57,9 @@ import (
secretvalidator "github.com/grafana/grafana/pkg/registry/apis/secret/validator"
appregistry "github.com/grafana/grafana/pkg/registry/apps"
playlistmigration "github.com/grafana/grafana/pkg/registry/apps/playlist"
playlistmigrator "github.com/grafana/grafana/pkg/registry/apps/playlist/migrator"
shorturlmigration "github.com/grafana/grafana/pkg/registry/apps/shorturl"
shorturlmigrator "github.com/grafana/grafana/pkg/registry/apps/shorturl/migrator"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/services/accesscontrol/dualwrite"
@ -243,9 +246,10 @@ var wireBasicSet = wire.NewSet(
wire.Bind(new(usagestats.Service), new(*uss.UsageStats)),
validator.ProvideService,
provisioning.ProvideStubProvisioningService,
legacy.ProvideMigratorDashboardAccessor,
legacy.ProvidePlaylistMigrator,
legacy.ProvideShortURLMigrator,
dashboardlegacy.ProvideMigrator,
dashboardmigrator.ProvideFoldersDashboardsMigrator,
playlistmigrator.ProvidePlaylistMigrator,
shorturlmigrator.ProvideShortURLMigrator,
provideMigrationRegistry,
unifiedmigrations.ProvideUnifiedMigrator,
pluginsintegration.WireSet,
@ -579,17 +583,17 @@ func InitializeDocumentBuilders(cfg *setting.Cfg) (resource.DocumentBuilderSuppl
}
/*
provideMigrationRegistry builds the MigrationRegistry directly from the
underlying dependencies. When adding a new resource migration, add the
dependency parameter here and register it with the registry.
provideMigrationRegistry builds the MigrationRegistry from individual
resource migrators. When adding a new resource migration, register
it with the registry here.
*/
func provideMigrationRegistry(
accessor legacy.MigrationDashboardAccessor,
playlistMigrator legacy.PlaylistMigrator,
shortURLMigrator legacy.ShortURLMigrator,
dashMigrator dashboardmigrator.FoldersDashboardsMigrator,
playlistMigrator playlistmigrator.PlaylistMigrator,
shortURLMigrator shorturlmigrator.ShortURLMigrator,
) *unifiedmigrations.MigrationRegistry {
r := unifiedmigrations.NewMigrationRegistry()
r.Register(dashboardmigration.FoldersDashboardsMigration(accessor))
r.Register(dashboardmigration.FoldersDashboardsMigration(dashMigrator))
r.Register(playlistmigration.PlaylistMigration(playlistMigrator))
r.Register(shorturlmigration.ShortURLMigration(shortURLMigrator))
return r

61
pkg/server/wire_gen.go generated

File diff suppressed because one or more lines are too long

View file

@ -128,7 +128,7 @@ SQL table and streams resources to unified storage:
func (a *myAccess) MigrateMyResources(
ctx context.Context,
orgId int64,
opts legacy.MigrateOptions,
opts MigrateOptions,
stream resourcepb.BulkStore_BulkProcessClient,
) error {
rows, err := a.listResources(ctx, orgId)
@ -156,7 +156,7 @@ Define a small interface in the legacy or types package so that Wire can provide
```go
type MyResourceMigrator interface {
MigrateMyResources(ctx context.Context, orgId int64, opts legacy.MigrateOptions,
MigrateMyResources(ctx context.Context, orgId int64, opts MigrateOptions,
stream resourcepb.BulkStore_BulkProcessClient) error
}
```
@ -218,12 +218,12 @@ myresource.ProvideMyResourceMigrator,
```go
func provideMigrationRegistry(
accessor legacy.MigrationDashboardAccessor,
playlistMigrator legacy.PlaylistMigrator,
dashMigrator dashboardmigrator.DashboardMigrator,
playlistMigrator playlistmigrator.PlaylistMigrator,
myResourceMigrator myresource.MyResourceMigrator, // <-- add parameter
) *unifiedmigrations.MigrationRegistry {
r := unifiedmigrations.NewMigrationRegistry()
r.Register(dashboardmigration.FoldersDashboardsMigration(accessor))
r.Register(dashboardmigration.FoldersDashboardsMigration(dashMigrator))
r.Register(playlistmigration.PlaylistMigration(playlistMigrator))
r.Register(myresource.MyResourceMigration(myResourceMigrator)) // <-- register
return r

View file

@ -7,7 +7,6 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"google.golang.org/grpc/metadata"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -17,17 +16,24 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
// MigrateOptions contains configuration for a resource migration operation.
type MigrateOptions struct {
Namespace string
Resources []schema.GroupResource
WithHistory bool // only applies to dashboards
Progress func(count int, msg string)
}
// Read from legacy and write into unified storage
//
//go:generate mockery --name UnifiedMigrator --structname MockUnifiedMigrator --inpackage --filename migrator_mock.go --with-expecter
type UnifiedMigrator interface {
Migrate(ctx context.Context, opts legacy.MigrateOptions) (*resourcepb.BulkResponse, error)
Migrate(ctx context.Context, opts MigrateOptions) (*resourcepb.BulkResponse, error)
RebuildIndexes(ctx context.Context, opts RebuildIndexOptions) error
}
// unifiedMigration handles the migration of legacy resources to unified storage
type unifiedMigration struct {
legacy.MigrationDashboardAccessor
streamProvider streamProvider
client resource.SearchClient
log log.Logger
@ -36,10 +42,10 @@ type unifiedMigration struct {
// streamProvider abstracts the different ways to create a bulk process stream
type streamProvider interface {
createStream(ctx context.Context, opts legacy.MigrateOptions, registry *MigrationRegistry) (resourcepb.BulkStore_BulkProcessClient, error)
createStream(ctx context.Context, opts MigrateOptions, registry *MigrationRegistry) (resourcepb.BulkStore_BulkProcessClient, error)
}
func buildCollectionSettings(opts legacy.MigrateOptions, registry *MigrationRegistry) resource.BulkSettings {
func buildCollectionSettings(opts MigrateOptions, registry *MigrationRegistry) resource.BulkSettings {
settings := resource.BulkSettings{SkipValidation: true}
for _, res := range opts.Resources {
key := buildResourceKey(res, opts.Namespace, registry)
@ -54,20 +60,18 @@ type resourceClientStreamProvider struct {
client resource.ResourceClient
}
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions, registry *MigrationRegistry) (resourcepb.BulkStore_BulkProcessClient, error) {
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts MigrateOptions, registry *MigrationRegistry) (resourcepb.BulkStore_BulkProcessClient, error) {
settings := buildCollectionSettings(opts, registry)
ctx = metadata.NewOutgoingContext(ctx, settings.ToMD())
return r.client.BulkProcess(ctx)
}
// This can migrate Folders, Dashboards and LibraryPanels
// This can migrate Folders, Dashboards, LibraryPanels and Playlists
func ProvideUnifiedMigrator(
dashboardAccess legacy.MigrationDashboardAccessor,
client resource.ResourceClient,
registry *MigrationRegistry,
) UnifiedMigrator {
return newUnifiedMigrator(
dashboardAccess,
&resourceClientStreamProvider{client: client},
client,
log.New("storage.unified.migrator"),
@ -76,22 +80,20 @@ func ProvideUnifiedMigrator(
}
func newUnifiedMigrator(
dashboardAccess legacy.MigrationDashboardAccessor,
streamProvider streamProvider,
client resource.SearchClient,
log log.Logger,
registry *MigrationRegistry,
) UnifiedMigrator {
return &unifiedMigration{
MigrationDashboardAccessor: dashboardAccess,
streamProvider: streamProvider,
client: client,
log: log,
registry: registry,
streamProvider: streamProvider,
client: client,
log: log,
registry: registry,
}
}
func (m *unifiedMigration) Migrate(ctx context.Context, opts legacy.MigrateOptions) (*resourcepb.BulkResponse, error) {
func (m *unifiedMigration) Migrate(ctx context.Context, opts MigrateOptions) (*resourcepb.BulkResponse, error) {
info, err := authlib.ParseNamespace(opts.Namespace)
if err != nil {
return nil, err
@ -104,10 +106,6 @@ func (m *unifiedMigration) Migrate(ctx context.Context, opts legacy.MigrateOptio
return nil, fmt.Errorf("missing resource selector")
}
if opts.OnlyCount {
return m.CountResources(ctx, opts)
}
stream, err := m.streamProvider.createStream(ctx, opts, m.registry)
if err != nil {
return nil, err

View file

@ -5,10 +5,8 @@ package migrations
import (
context "context"
legacy "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
mock "github.com/stretchr/testify/mock"
resourcepb "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
mock "github.com/stretchr/testify/mock"
)
// MockUnifiedMigrator is an autogenerated mock type for the UnifiedMigrator type
@ -25,7 +23,7 @@ func (_m *MockUnifiedMigrator) EXPECT() *MockUnifiedMigrator_Expecter {
}
// Migrate provides a mock function with given fields: ctx, opts
func (_m *MockUnifiedMigrator) Migrate(ctx context.Context, opts legacy.MigrateOptions) (*resourcepb.BulkResponse, error) {
func (_m *MockUnifiedMigrator) Migrate(ctx context.Context, opts MigrateOptions) (*resourcepb.BulkResponse, error) {
ret := _m.Called(ctx, opts)
if len(ret) == 0 {
@ -34,10 +32,10 @@ func (_m *MockUnifiedMigrator) Migrate(ctx context.Context, opts legacy.MigrateO
var r0 *resourcepb.BulkResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, legacy.MigrateOptions) (*resourcepb.BulkResponse, error)); ok {
if rf, ok := ret.Get(0).(func(context.Context, MigrateOptions) (*resourcepb.BulkResponse, error)); ok {
return rf(ctx, opts)
}
if rf, ok := ret.Get(0).(func(context.Context, legacy.MigrateOptions) *resourcepb.BulkResponse); ok {
if rf, ok := ret.Get(0).(func(context.Context, MigrateOptions) *resourcepb.BulkResponse); ok {
r0 = rf(ctx, opts)
} else {
if ret.Get(0) != nil {
@ -45,7 +43,7 @@ func (_m *MockUnifiedMigrator) Migrate(ctx context.Context, opts legacy.MigrateO
}
}
if rf, ok := ret.Get(1).(func(context.Context, legacy.MigrateOptions) error); ok {
if rf, ok := ret.Get(1).(func(context.Context, MigrateOptions) error); ok {
r1 = rf(ctx, opts)
} else {
r1 = ret.Error(1)
@ -61,14 +59,14 @@ type MockUnifiedMigrator_Migrate_Call struct {
// Migrate is a helper method to define mock.On call
// - ctx context.Context
// - opts legacy.MigrateOptions
// - opts MigrateOptions
func (_e *MockUnifiedMigrator_Expecter) Migrate(ctx interface{}, opts interface{}) *MockUnifiedMigrator_Migrate_Call {
return &MockUnifiedMigrator_Migrate_Call{Call: _e.mock.On("Migrate", ctx, opts)}
}
func (_c *MockUnifiedMigrator_Migrate_Call) Run(run func(ctx context.Context, opts legacy.MigrateOptions)) *MockUnifiedMigrator_Migrate_Call {
func (_c *MockUnifiedMigrator_Migrate_Call) Run(run func(ctx context.Context, opts MigrateOptions)) *MockUnifiedMigrator_Migrate_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(legacy.MigrateOptions))
run(args[0].(context.Context), args[1].(MigrateOptions))
})
return _c
}
@ -78,7 +76,7 @@ func (_c *MockUnifiedMigrator_Migrate_Call) Return(_a0 *resourcepb.BulkResponse,
return _c
}
func (_c *MockUnifiedMigrator_Migrate_Call) RunAndReturn(run func(context.Context, legacy.MigrateOptions) (*resourcepb.BulkResponse, error)) *MockUnifiedMigrator_Migrate_Call {
func (_c *MockUnifiedMigrator_Migrate_Call) RunAndReturn(run func(context.Context, MigrateOptions) (*resourcepb.BulkResponse, error)) *MockUnifiedMigrator_Migrate_Call {
_c.Call.Return(run)
return _c
}

View file

@ -10,10 +10,12 @@ import (
authlib "github.com/grafana/authlib/types"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/infra/db"
dashboardpkg "github.com/grafana/grafana/pkg/registry/apis/dashboard"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
playlistpkg "github.com/grafana/grafana/pkg/registry/apps/playlist"
shorturlpkg "github.com/grafana/grafana/pkg/registry/apps/shorturl"
dashboard "github.com/grafana/grafana/pkg/registry/apis/dashboard"
dashboardmigrator "github.com/grafana/grafana/pkg/registry/apis/dashboard/migrator"
playlist "github.com/grafana/grafana/pkg/registry/apps/playlist"
playlistmigrator "github.com/grafana/grafana/pkg/registry/apps/playlist/migrator"
shorturl "github.com/grafana/grafana/pkg/registry/apps/shorturl"
shorturlmigrator "github.com/grafana/grafana/pkg/registry/apps/shorturl/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
"github.com/grafana/grafana/pkg/storage/unified/migrations/testcases"
@ -401,15 +403,11 @@ func TestUnifiedMigration_RebuildIndexes(t *testing.T) {
Times(tt.numRetries)
// Create migrator with mock client
mockAccessor := &legacy.MockMigrationDashboardAccessor{}
mockPlaylist := &legacy.MockPlaylistMigrator{}
mockShortURL := &legacy.MockShortURLMigrator{}
registry := migrations.NewMigrationRegistry()
registry.Register(dashboardpkg.FoldersDashboardsMigration(mockAccessor))
registry.Register(playlistpkg.PlaylistMigration(mockPlaylist))
registry.Register(shorturlpkg.ShortURLMigration(mockShortURL))
registry.Register(dashboard.FoldersDashboardsMigration(dashboardmigrator.ProvideFoldersDashboardsMigrator(nil)))
registry.Register(playlist.PlaylistMigration(playlistmigrator.ProvidePlaylistMigrator(nil)))
registry.Register(shorturl.ShortURLMigration(shorturlmigrator.ProvideShortURLMigrator(nil)))
migrator := migrations.ProvideUnifiedMigrator(
mockAccessor,
mockClient,
registry,
)
@ -461,15 +459,11 @@ func TestUnifiedMigration_RebuildIndexes_RetrySuccess(t *testing.T) {
Once()
// Create migrator with mock client
mockAccessor := &legacy.MockMigrationDashboardAccessor{}
mockPlaylist := &legacy.MockPlaylistMigrator{}
mockShortURL := &legacy.MockShortURLMigrator{}
registry := migrations.NewMigrationRegistry()
registry.Register(dashboardpkg.FoldersDashboardsMigration(mockAccessor))
registry.Register(playlistpkg.PlaylistMigration(mockPlaylist))
registry.Register(shorturlpkg.ShortURLMigration(mockShortURL))
registry.Register(dashboard.FoldersDashboardsMigration(dashboardmigrator.ProvideFoldersDashboardsMigrator(nil)))
registry.Register(playlist.PlaylistMigration(playlistmigrator.ProvidePlaylistMigrator(nil)))
registry.Register(shorturl.ShortURLMigration(shorturlmigrator.ProvideShortURLMigrator(nil)))
migrator := migrations.ProvideUnifiedMigrator(
mockAccessor,
mockClient,
registry,
)
@ -653,15 +647,11 @@ func TestUnifiedMigration_RebuildIndexes_UsingDistributor(t *testing.T) {
Times(tt.numRetries)
// Create migrator with mock client
mockAccessor := &legacy.MockMigrationDashboardAccessor{}
mockPlaylist := &legacy.MockPlaylistMigrator{}
mockShortURL := &legacy.MockShortURLMigrator{}
registry := migrations.NewMigrationRegistry()
registry.Register(dashboardpkg.FoldersDashboardsMigration(mockAccessor))
registry.Register(playlistpkg.PlaylistMigration(mockPlaylist))
registry.Register(shorturlpkg.ShortURLMigration(mockShortURL))
registry.Register(dashboard.FoldersDashboardsMigration(dashboardmigrator.ProvideFoldersDashboardsMigrator(nil)))
registry.Register(playlist.PlaylistMigration(playlistmigrator.ProvidePlaylistMigrator(nil)))
registry.Register(shorturl.ShortURLMigration(shorturlmigrator.ProvideShortURLMigrator(nil)))
migrator := migrations.ProvideUnifiedMigrator(
mockAccessor,
mockClient,
registry,
)
@ -729,15 +719,11 @@ func TestUnifiedMigration_RebuildIndexes_UsingDistributor_RetrySuccess(t *testin
Once()
// Create migrator with mock client
mockAccessor := &legacy.MockMigrationDashboardAccessor{}
mockPlaylist := &legacy.MockPlaylistMigrator{}
mockShortURL := &legacy.MockShortURLMigrator{}
registry := migrations.NewMigrationRegistry()
registry.Register(dashboardpkg.FoldersDashboardsMigration(mockAccessor))
registry.Register(playlistpkg.PlaylistMigration(mockPlaylist))
registry.Register(shorturlpkg.ShortURLMigration(mockShortURL))
registry.Register(dashboard.FoldersDashboardsMigration(dashboardmigrator.ProvideFoldersDashboardsMigrator(nil)))
registry.Register(playlist.PlaylistMigration(playlistmigrator.ProvidePlaylistMigrator(nil)))
registry.Register(shorturl.ShortURLMigration(shorturlmigrator.ProvideShortURLMigrator(nil)))
migrator := migrations.ProvideUnifiedMigrator(
mockAccessor,
mockClient,
registry,
)

View file

@ -5,7 +5,6 @@ import (
"sync"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/xorm"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -23,7 +22,7 @@ type Validator interface {
type ValidatorFactory func(client resourcepb.ResourceIndexClient, driverName string) Validator
// MigratorFunc is the signature for resource migration functions.
type MigratorFunc = func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
type MigratorFunc = func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
// ResourceInfo extends GroupResource with additional metadata needed for migration.
type ResourceInfo struct {

View file

@ -5,7 +5,6 @@ import (
"sync"
"testing"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -95,7 +94,7 @@ func TestMigrationRegistry_Register(t *testing.T) {
MigrationID: "widgets migration log id",
Resources: []ResourceInfo{ri},
Migrators: map[schema.GroupResource]MigratorFunc{
gr: func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
gr: func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return nil
},
},
@ -242,7 +241,7 @@ func TestMigrationRegistry_All(t *testing.T) {
}
func TestMigrationRegistry_HasResource(t *testing.T) {
noopMigrator := func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
noopMigrator := func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return nil
}
@ -392,7 +391,7 @@ func TestMigrationDefinition_GetMigratorFunc(t *testing.T) {
called := false
def := MigrationDefinition{
Migrators: map[schema.GroupResource]MigratorFunc{
gr: func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
gr: func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
called = true
return nil
},
@ -402,7 +401,7 @@ func TestMigrationDefinition_GetMigratorFunc(t *testing.T) {
result := def.GetMigratorFunc(gr)
require.NotNil(t, result)
err := result(context.Background(), 1, legacy.MigrateOptions{}, nil)
err := result(context.Background(), 1, MigrateOptions{}, nil)
require.NoError(t, err)
require.True(t, called)
})
@ -411,7 +410,7 @@ func TestMigrationDefinition_GetMigratorFunc(t *testing.T) {
existingGR := testGroupResource("existing.group", "widgets")
def := MigrationDefinition{
Migrators: map[schema.GroupResource]MigratorFunc{
existingGR: func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
existingGR: func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return nil
},
},
@ -435,7 +434,7 @@ func TestMigrationDefinition_GetMigratorFunc(t *testing.T) {
}
func TestMigrationRegistry_GetMigratorFunc(t *testing.T) {
noopMigrator := func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
noopMigrator := func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return nil
}
@ -495,7 +494,7 @@ func TestMigrationRegistry_GetMigratorFunc(t *testing.T) {
}
func TestMigrationRegistry_ConcurrentAccess(t *testing.T) {
noopMigrator := func(ctx context.Context, orgId int64, opts legacy.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
noopMigrator := func(ctx context.Context, orgId int64, opts MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error {
return nil
}

View file

@ -8,7 +8,6 @@ import (
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
@ -117,7 +116,7 @@ func (r *MigrationRunner) MigrateOrg(ctx context.Context, sess *xorm.Session, in
startTime := time.Now()
migrateOpts := legacy.MigrateOptions{
migrateOpts := MigrateOptions{
Namespace: info.Value,
Resources: r.resources,
WithHistory: true, // Migrate with full history