This commit is contained in:
StackoverFrog 2026-04-02 19:29:40 +00:00 committed by GitHub
commit 8c33effbba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 535 additions and 8 deletions

View file

@ -89,11 +89,51 @@ func (cfgmaps *ConfigMaps) Get(key string) (release.Releaser, error) {
return r, nil
}
// listPages is a common method to list release with pagination
func (cfgmaps *ConfigMaps) listPages(f func(page []release.Releaser, remaining bool) (end bool), opts metav1.ListOptions, filter func(release.Releaser) bool) (err error) {
if opts.Limit == 0 {
opts.Limit = DefaultPaginationLimit
}
Loop:
for {
list, err := cfgmaps.impl.List(context.Background(), opts)
if nil != err {
return err
}
opts.Continue = list.Continue
var results []release.Releaser
// iterate over the configmaps object list
// and decode each release
for _, item := range list.Items {
rls, err := decodeRelease(string(item.Data["release"]))
if err != nil {
cfgmaps.Logger().Debug("failed to decode release", "item", item, slog.Any("error", err))
continue
}
rls.Labels = item.Labels
if filter(rls) {
results = append(results, rls)
}
}
if f(results, list.Continue != "") {
break Loop
}
}
return nil
}
// List fetches all releases and returns the list releases such
// that filter(release) == true. An error is returned if the
// configmap fails to retrieve the releases.
func (cfgmaps *ConfigMaps) List(filter func(release.Releaser) bool) ([]release.Releaser, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector()
lsel := kblabels.Set{"owner": owner}.AsSelector()
opts := metav1.ListOptions{LabelSelector: lsel.String()}
list, err := cfgmaps.impl.List(context.Background(), opts)
@ -122,6 +162,14 @@ func (cfgmaps *ConfigMaps) List(filter func(release.Releaser) bool) ([]release.R
return results, nil
}
// ListPages same as List, but with pagination
func (cfgmaps *ConfigMaps) ListPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, filter func(release.Releaser) bool) error {
lsel := kblabels.Set{"owner": owner}.AsSelector()
opts := metav1.ListOptions{Limit: limit, LabelSelector: lsel.String()}
return cfgmaps.listPages(f, opts, filter)
}
// Query fetches all releases that match the provided map of labels.
// An error is returned if the configmap fails to retrieve the releases.
func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]release.Releaser, error) {
@ -158,6 +206,21 @@ func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]release.Releaser,
return results, nil
}
// QueryPages same as Query, but with pagination
func (cfgmaps *ConfigMaps) QueryPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, labels map[string]string) error {
ls := kblabels.Set{}
for k, v := range labels {
if errs := validation.IsValidLabelValue(v); len(errs) != 0 {
return fmt.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; "))
}
ls[k] = v
}
opts := metav1.ListOptions{Limit: limit, LabelSelector: ls.AsSelector().String()}
return cfgmaps.listPages(f, opts, func(release.Releaser) bool { return true })
}
// Create creates a new ConfigMap holding the release. If the
// ConfigMap already exists, ErrReleaseExists is returned.
func (cfgmaps *ConfigMaps) Create(key string, rls release.Releaser) error {
@ -256,8 +319,6 @@ func (cfgmaps *ConfigMaps) Delete(key string) (rls release.Releaser, err error)
// "owner" - owner of the configmap, currently "helm".
// "name" - name of the release.
func newConfigMapsObject(key string, rls *rspb.Release, lbs labels) (*v1.ConfigMap, error) {
const owner = "helm"
// encode the release
s, err := encodeRelease(rls)
if err != nil {

View file

@ -111,6 +111,20 @@ func TestConfigMapList(t *testing.T) {
releaseStub("key-6", 1, "default", common.StatusSuperseded),
}...)
// list releases with pagination
err := cfgmaps.ListPages(func(page []release.Releaser, remaining bool) (end bool) {
// check
if len(page) != 2 {
t.Errorf("Expected 2 cfgmaps, got %d:\n%v\n", len(page), page)
}
return !remaining
}, 2, func(rel release.Releaser) bool { return true })
// check
if err != nil {
t.Errorf("Failed to list cfgmaps: %s", err)
}
// list all deleted releases
del, err := cfgmaps.List(func(rel release.Releaser) bool {
rls := convertReleaserToV1(t, rel)
@ -183,6 +197,18 @@ func TestConfigMapQuery(t *testing.T) {
if !errors.Is(err, ErrReleaseNotFound) {
t.Errorf("Expected {%v}, got {%v}", ErrReleaseNotFound, err)
}
// query cfgmaps with pagination
err = cfgmaps.QueryPages(func(page []release.Releaser, remaining bool) (err bool) {
if len(page) != 2 {
t.Fatalf("Expected 2 results, actual %d", len(page))
}
return !remaining
}, 2, map[string]string{"status": "deployed"})
if err != nil {
t.Fatalf("Failed to query: %s", err)
}
}
func TestConfigMapCreate(t *testing.T) {

View file

@ -24,6 +24,13 @@ import (
rspb "helm.sh/helm/v4/pkg/release/v1"
)
const (
// owner is owner of the secret
owner = "helm"
// DefaultPaginationLimit .
DefaultPaginationLimit = 50
)
var (
// ErrReleaseNotFound indicates that a release is not found.
ErrReleaseNotFound = errors.New("release: not found")
@ -89,7 +96,9 @@ type Deletor interface {
type Queryor interface {
Get(key string) (release.Releaser, error)
List(filter func(release.Releaser) bool) ([]release.Releaser, error)
ListPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, filter func(release.Releaser) bool) error
Query(labels map[string]string) ([]release.Releaser, error)
QueryPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, labels map[string]string) error
}
// Driver is the interface composed of Creator, Updator, Deletor, and Queryor

View file

@ -114,6 +114,72 @@ func (mem *Memory) List(filter func(release.Releaser) bool) ([]release.Releaser,
return ls, nil
}
// ListPages is a common method to list release with pagination
func (mem *Memory) ListPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, filter func(release.Releaser) bool) (err error) {
defer unlock(mem.rlock())
if limit == 0 {
limit = DefaultPaginationLimit
}
ls := make([]release.Releaser, 0, int(limit))
if mem.namespace != "" {
for _, recs := range mem.cache[mem.namespace] {
recs.Iter(func(i int, rec *record) bool {
if filter(rec.rls) {
ls = append(ls, rec.rls)
}
if int64(len(ls)) >= limit {
end := f(ls, i == len(recs))
ls = make([]release.Releaser, 0, int(limit))
return !end
}
return true
})
if len(ls) > 0 {
_ = f(ls, true)
}
}
return nil
}
var total int
for _, releases := range mem.cache {
total += len(releases)
}
var index int
for namespace := range mem.cache {
for _, recs := range mem.cache[namespace] {
recs.Iter(func(_ int, rec *record) bool {
index++
if filter(rec.rls) {
ls = append(ls, rec.rls)
}
if int64(len(ls)) >= limit {
end := f(ls, index == total)
ls = make([]release.Releaser, 0, int(limit))
return !end
}
return true
})
}
}
_ = f(ls, true)
return nil
}
// Query returns the set of releases that match the provided set of labels
func (mem *Memory) Query(keyvals map[string]string) ([]release.Releaser, error) {
defer unlock(mem.rlock())
@ -155,6 +221,81 @@ func (mem *Memory) Query(keyvals map[string]string) ([]release.Releaser, error)
return ls, nil
}
// QueryPages same as Query, but with pagination
func (mem *Memory) QueryPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, keyvals map[string]string) error {
defer unlock(mem.rlock())
if limit == 0 {
limit = DefaultPaginationLimit
}
var lbs labels
lbs.init()
lbs.fromMap(keyvals)
ls := make([]release.Releaser, 0, int(limit))
if mem.namespace != "" {
for _, recs := range mem.cache[mem.namespace] {
recs.Iter(func(i int, rec *record) bool {
if rec.lbs.match(lbs) {
ls = append(ls, rec.rls)
}
if int64(len(ls)) >= limit {
end := f(ls, i == len(recs))
ls = make([]release.Releaser, 0, int(limit))
return !end
}
return true
})
if len(ls) > 0 {
_ = f(ls, true)
}
}
return nil
}
var total int
for _, releases := range mem.cache {
for _, recs := range releases {
total += len(recs)
}
}
var index int
for _, releases := range mem.cache {
for _, recs := range releases {
recs.Iter(func(_ int, rec *record) bool {
index++
if rec.lbs.match(lbs) {
ls = append(ls, rec.rls)
}
if int64(len(ls)) >= limit {
end := f(ls, index == total)
ls = make([]release.Releaser, 0, int(limit))
return !end
}
return true
})
}
}
if len(ls) > 0 {
_ = f(ls, true)
}
return nil
}
// Create creates a new release or returns ErrReleaseExists.
func (mem *Memory) Create(key string, rel release.Releaser) error {
defer unlock(mem.wlock())

View file

@ -107,6 +107,20 @@ func TestMemoryList(t *testing.T) {
ts := tsFixtureMemory(t)
ts.SetNamespace("default")
// list releases with pagination
err := ts.ListPages(func(page []release.Releaser, remaining bool) (end bool) {
// check
if len(page) != 2 {
t.Errorf("Expected 2 releases, got %d:\n%v\n", len(page), page)
}
return !remaining
}, 2, func(rel release.Releaser) bool { return true })
// check
if err != nil {
t.Errorf("Failed to list releases: %s", err)
}
// list all deployed releases
dpl, err := ts.List(func(rel release.Releaser) bool {
rls := convertReleaserToV1(t, rel)
@ -255,6 +269,19 @@ func TestMemoryDelete(t *testing.T) {
ts := tsFixtureMemory(t)
ts.SetNamespace("")
// query cfgmaps with pagination
err := ts.QueryPages(func(page []release.Releaser, remaining bool) (err bool) {
if len(page) != 1 {
t.Fatalf("Expected 1 results, actual %d", len(page))
}
return !remaining
}, 1, map[string]string{"status": "deployed"})
if err != nil {
t.Fatalf("Failed to query: %s", err)
}
start, err := ts.Query(map[string]string{"status": "deployed"})
if err != nil {
t.Errorf("Query failed: %s", err)

View file

@ -19,6 +19,7 @@ package driver // import "helm.sh/helm/v4/pkg/storage/driver"
import (
"context"
"fmt"
"strconv"
"testing"
sqlmock "github.com/DATA-DOG/go-sqlmock"
@ -133,11 +134,32 @@ func (mock *MockConfigMapsInterface) List(_ context.Context, opts metav1.ListOpt
return nil, err
}
objects := make([]*v1.ConfigMap, 0, len(mock.objects))
for _, cfgmap := range mock.objects {
if labelSelector.Matches(kblabels.Set(cfgmap.Labels)) {
list.Items = append(list.Items, *cfgmap)
objects = append(objects, cfgmap)
}
}
if opts.Continue != "" {
i, err := strconv.ParseInt(opts.Continue, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid continue value %q: %w", opts.Continue, err)
}
objects = objects[int(i):]
list.Continue = ""
}
if opts.Limit > 0 && opts.Limit <= int64(len(objects)) {
objects = objects[:int(opts.Limit)]
if opts.Limit < int64(len(objects)) {
list.Continue = strconv.FormatInt(opts.Limit, 10)
}
}
for _, cfgmap := range objects {
list.Items = append(list.Items, *cfgmap)
}
return &list, nil
}
@ -221,11 +243,32 @@ func (mock *MockSecretsInterface) List(_ context.Context, opts metav1.ListOption
return nil, err
}
objects := make([]*v1.Secret, 0, len(mock.objects))
for _, secret := range mock.objects {
if labelSelector.Matches(kblabels.Set(secret.Labels)) {
list.Items = append(list.Items, *secret)
objects = append(objects, secret)
}
}
if opts.Continue != "" {
i, err := strconv.ParseInt(opts.Continue, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid continue value: %v", err)
}
objects = objects[int(i):]
list.Continue = ""
}
if opts.Limit > 0 && opts.Limit <= int64(len(objects)) {
objects = objects[:int(opts.Limit)]
if opts.Limit < int64(len(objects)) {
list.Continue = strconv.FormatInt(opts.Limit, 10)
}
}
for _, secret := range objects {
list.Items = append(list.Items, *secret)
}
return &list, nil
}

View file

@ -84,11 +84,51 @@ func (secrets *Secrets) Get(key string) (release.Releaser, error) {
return r, nil
}
// listPages is a common method to list release with pagination
func (secrets *Secrets) listPages(f func(page []release.Releaser, remaining bool) (end bool), opts metav1.ListOptions, filter func(release.Releaser) bool) (err error) {
if opts.Limit == 0 {
opts.Limit = DefaultPaginationLimit
}
Loop:
for {
list, err := secrets.impl.List(context.Background(), opts)
if nil != err {
return err
}
opts.Continue = list.Continue
var results []release.Releaser
// iterate over the secrets object list
// and decode each release
for _, item := range list.Items {
rls, err := decodeRelease(string(item.Data["release"]))
if err != nil {
secrets.Logger().Debug("list failed to decode release", "key", item.Name, slog.Any("error", err))
continue
}
rls.Labels = item.Labels
if filter(rls) {
results = append(results, rls)
}
}
if f(results, list.Continue != "") {
break Loop
}
}
return nil
}
// List fetches all releases and returns the list releases such
// that filter(release) == true. An error is returned if the
// secret fails to retrieve the releases.
func (secrets *Secrets) List(filter func(release.Releaser) bool) ([]release.Releaser, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector()
lsel := kblabels.Set{"owner": owner}.AsSelector()
opts := metav1.ListOptions{LabelSelector: lsel.String()}
list, err := secrets.impl.List(context.Background(), opts)
@ -119,6 +159,14 @@ func (secrets *Secrets) List(filter func(release.Releaser) bool) ([]release.Rele
return results, nil
}
// ListPages same as List, but with pagination
func (secrets *Secrets) ListPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, filter func(release.Releaser) bool) error {
lsel := kblabels.Set{"owner": owner}.AsSelector()
opts := metav1.ListOptions{Limit: limit, LabelSelector: lsel.String()}
return secrets.listPages(f, opts, filter)
}
// Query fetches all releases that match the provided map of labels.
// An error is returned if the secret fails to retrieve the releases.
func (secrets *Secrets) Query(labels map[string]string) ([]release.Releaser, error) {
@ -158,6 +206,21 @@ func (secrets *Secrets) Query(labels map[string]string) ([]release.Releaser, err
return results, nil
}
// QueryPages same as Query, but with pagination
func (secrets *Secrets) QueryPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, labels map[string]string) error {
ls := kblabels.Set{}
for k, v := range labels {
if errs := validation.IsValidLabelValue(v); len(errs) != 0 {
return fmt.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; "))
}
ls[k] = v
}
opts := metav1.ListOptions{Limit: limit, LabelSelector: ls.AsSelector().String()}
return secrets.listPages(f, opts, func(release.Releaser) bool { return true })
}
// Create creates a new Secret holding the release. If the
// Secret already exists, ErrReleaseExists is returned.
func (secrets *Secrets) Create(key string, rel release.Releaser) error {
@ -244,8 +307,6 @@ func (secrets *Secrets) Delete(key string) (rls release.Releaser, err error) {
// "owner" - owner of the secret, currently "helm".
// "name" - name of the release.
func newSecretsObject(key string, rls *rspb.Release, lbs labels) (*v1.Secret, error) {
const owner = "helm"
// encode the release
s, err := encodeRelease(rls)
if err != nil {

View file

@ -96,6 +96,20 @@ func TestSecretList(t *testing.T) {
releaseStub("key-6", 1, "default", common.StatusSuperseded),
}...)
// list releases with pagination
err := secrets.ListPages(func(page []release.Releaser, remaining bool) (end bool) {
// check
if len(page) != 2 {
t.Errorf("Expected 2 secrets, got %d:\n%v\n", len(page), page)
}
return !remaining
}, 2, func(rel release.Releaser) bool { return true })
// check
if err != nil {
t.Errorf("Failed to list secrets: %s", err)
}
// list all deleted releases
del, err := secrets.List(func(rel release.Releaser) bool {
rls := convertReleaserToV1(t, rel)
@ -168,6 +182,18 @@ func TestSecretQuery(t *testing.T) {
if !errors.Is(err, ErrReleaseNotFound) {
t.Errorf("Expected {%v}, got {%v}", ErrReleaseNotFound, err)
}
// query releases with pagination
err = secrets.QueryPages(func(page []release.Releaser, remaining bool) (err bool) {
if len(page) != 2 {
t.Fatalf("Expected 2 results, actual %d", len(page))
}
return !remaining
}, 2, map[string]string{"status": "deployed"})
if err != nil {
t.Fatalf("Failed to query: %s", err)
}
}
func TestSecretCreate(t *testing.T) {

View file

@ -393,6 +393,66 @@ func (s *SQL) List(filter func(release.Releaser) bool) ([]release.Releaser, erro
return releases, nil
}
// ListPages is a common method to list release with pagination
func (s *SQL) ListPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, filter func(release.Releaser) bool) (err error) {
if limit == 0 {
limit = DefaultPaginationLimit
}
sb := s.statementBuilder.
Select(sqlReleaseTableKeyColumn, sqlReleaseTableNamespaceColumn, sqlReleaseTableBodyColumn).
From(sqlReleaseTableName).
Where(sq.Eq{sqlReleaseTableOwnerColumn: sqlReleaseDefaultOwner})
// If a namespace was specified, we only list releases from that namespace
if s.namespace != "" {
sb = sb.Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
}
var page, offset uint64
for {
offset = (page - 1) * uint64(limit)
query, args, err := sb.Offset(offset).Limit(uint64(limit)).ToSql()
if err != nil {
s.Logger().Debug("failed to build query", slog.Any("error", err))
return err
}
var records = []SQLReleaseWrapper{}
if err := s.db.Select(&records, query, args...); err != nil {
s.Logger().Debug("failed to list", slog.Any("error", err))
return err
}
var releases []release.Releaser
for _, record := range records {
release, err := decodeRelease(record.Body)
if err != nil {
s.Logger().Debug("failed to decode release", "record", record, slog.Any("error", err))
continue
}
if release.Labels, err = s.getReleaseCustomLabels(record.Key, record.Namespace); err != nil {
s.Logger().Debug("failed to get release custom labels", "namespace", record.Namespace, "key", record.Key, slog.Any("error", err))
return err
}
maps.Copy(release.Labels, getReleaseSystemLabels(release))
if filter(release) {
releases = append(releases, release)
}
}
if f(releases, int64(len(records)) < limit) {
break
}
page++
}
return nil
}
// Query returns the set of releases that match the provided set of labels.
func (s *SQL) Query(labels map[string]string) ([]release.Releaser, error) {
sb := s.statementBuilder.
@ -463,6 +523,73 @@ func (s *SQL) Query(labels map[string]string) ([]release.Releaser, error) {
return releases, nil
}
// QueryPages same as Query, but with pagination
func (s *SQL) QueryPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, labels map[string]string) error {
sb := s.statementBuilder.
Select(sqlReleaseTableKeyColumn, sqlReleaseTableNamespaceColumn, sqlReleaseTableBodyColumn).
From(sqlReleaseTableName)
keys := make([]string, 0, len(labels))
for key := range labels {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
if _, ok := labelMap[key]; ok {
sb = sb.Where(sq.Eq{key: labels[key]})
} else {
s.Logger().Debug("unknown label", "key", key)
return fmt.Errorf("unknown label %s", key)
}
}
// If a namespace was specified, we only list releases from that namespace
if s.namespace != "" {
sb = sb.Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
}
var page, offset uint64
for {
// Build our query
offset = (page - 1) * uint64(limit)
query, args, err := sb.Offset(offset).Limit(uint64(limit)).ToSql()
if err != nil {
s.Logger().Debug("failed to build query", slog.Any("error", err))
return err
}
var records = []SQLReleaseWrapper{}
if err := s.db.Select(&records, query, args...); err != nil {
s.Logger().Debug("failed to query with labels", slog.Any("error", err))
return err
}
var releases []release.Releaser
for _, record := range records {
release, err := decodeRelease(record.Body)
if err != nil {
s.Logger().Debug("failed to decode release", "record", record, slog.Any("error", err))
continue
}
if release.Labels, err = s.getReleaseCustomLabels(record.Key, record.Namespace); err != nil {
s.Logger().Debug("failed to get release custom labels", "namespace", record.Namespace, "key", record.Key, slog.Any("error", err))
return err
}
releases = append(releases, release)
}
if f(releases, int64(len(records)) < limit) {
break
}
page++
}
return nil
}
// Create creates a new release.
func (s *SQL) Create(key string, rel release.Releaser) error {
rls, err := releaserToV1Release(rel)

View file

@ -316,9 +316,15 @@ func (d *MaxHistoryMockDriver) Get(key string) (release.Releaser, error) {
func (d *MaxHistoryMockDriver) List(filter func(release.Releaser) bool) ([]release.Releaser, error) {
return d.Driver.List(filter)
}
func (d *MaxHistoryMockDriver) ListPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, filter func(release.Releaser) bool) error {
return d.Driver.ListPages(f, limit, filter)
}
func (d *MaxHistoryMockDriver) Query(labels map[string]string) ([]release.Releaser, error) {
return d.Driver.Query(labels)
}
func (d *MaxHistoryMockDriver) QueryPages(f func(page []release.Releaser, remaining bool) (end bool), limit int64, labels map[string]string) error {
return d.Driver.QueryPages(f, limit, labels)
}
func (d *MaxHistoryMockDriver) Name() string {
return d.Driver.Name()
}