From 42a3d75bf87a59c402ac72dd369ffe898cf8eb7b Mon Sep 17 00:00:00 2001 From: Ling Samuel Date: Wed, 2 Dec 2020 17:35:10 +0800 Subject: [PATCH 1/3] apiserver add --lease-reuse-duration-seconds to config lease reuse duration Signed-off-by: Ling Samuel --- .../app/options/options_test.go | 11 +++++----- .../apiserver/pkg/server/options/etcd.go | 3 +++ .../k8s.io/apiserver/pkg/storage/etcd3/BUILD | 1 + .../pkg/storage/etcd3/lease_manager.go | 12 ++-------- .../pkg/storage/etcd3/lease_manager_test.go | 3 ++- .../apiserver/pkg/storage/etcd3/store.go | 8 +++---- .../apiserver/pkg/storage/etcd3/store_test.go | 22 +++++++++---------- .../pkg/storage/etcd3/watcher_test.go | 5 +++-- .../pkg/storage/storagebackend/config.go | 18 +++++++++------ .../storage/storagebackend/factory/etcd3.go | 2 +- .../k8s.io/apiserver/pkg/storage/tests/BUILD | 1 + .../pkg/storage/tests/cacher_test.go | 3 ++- 12 files changed, 47 insertions(+), 42 deletions(-) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 2149c0bed25..9bd6857692d 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -155,11 +155,12 @@ func TestAddFlags(t *testing.T) { TrustedCAFile: "/var/run/kubernetes/etcdca.crt", CertFile: "/var/run/kubernetes/etcdce.crt", }, - Paging: true, - Prefix: "/registry", - CompactionInterval: storagebackend.DefaultCompactInterval, - CountMetricPollPeriod: time.Minute, - DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, + Paging: true, + Prefix: "/registry", + CompactionInterval: storagebackend.DefaultCompactInterval, + CountMetricPollPeriod: time.Minute, + DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, + LeaseReuseDurationSeconds: storagebackend.DefaultLeaseReuseDurationSeconds, }, DefaultStorageMediaType: "application/vnd.kubernetes.protobuf", DeleteCollectionWorkers: 1, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 192edeb6b21..d5fed31d027 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -180,6 +180,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval, "The interval of requests to poll etcd and update metric. 0 disables the metric collection") + + fs.Int64Var(&s.StorageConfig.LeaseReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseReuseDurationSeconds, + "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } func (s *EtcdOptions) ApplyTo(c *server.Config) error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 982455986f3..1c13b56b084 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -35,6 +35,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go index 6b5a5700a9e..b34c7fb0950 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go @@ -42,8 +42,8 @@ type leaseManager struct { } // newDefaultLeaseManager creates a new lease manager using default setting. -func newDefaultLeaseManager(client *clientv3.Client) *leaseManager { - return newLeaseManager(client, 60, 0.05) +func newDefaultLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64) *leaseManager { + return newLeaseManager(client, leaseReuseDurationSeconds, 0.05) } // newLeaseManager creates a new lease manager with the number of buffered @@ -57,14 +57,6 @@ func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, l } } -// setLeaseReuseDurationSeconds is used for testing purpose. It is used to -// reduce the extra lease duration to avoid unnecessary timeout in testing. -func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) { - l.leaseMu.Lock() - defer l.leaseMu.Unlock() - l.leaseReuseDurationSeconds = duration -} - // GetLease returns a lease based on requested ttl: if the cached previous // lease can be reused, reuse it; otherwise request a new one from etcd. func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go index e63a8e65e70..122453a72d5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "k8s.io/apiserver/pkg/storage/storagebackend" "testing" ) @@ -34,7 +35,7 @@ func TestGetReuseDurationSeconds(t *testing.T) { duration: 50, }, } - lm := newDefaultLeaseManager(nil) + lm := newDefaultLeaseManager(nil, storagebackend.DefaultLeaseReuseDurationSeconds) for i := 0; i < len(testCases); i++ { dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl) if dur != testCases[i].duration { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 3435b9ffaa9..8ee3c7cba7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -83,11 +83,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { - return newStore(c, pagingEnabled, codec, prefix, transformer) +func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool, leaseReuseDurationSeconds int64) storage.Interface { + return newStore(c, pagingEnabled, leaseReuseDurationSeconds, codec, prefix, transformer) } -func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, pagingEnabled bool, leaseReuseDurationSeconds int64, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, @@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefi // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' pathPrefix: path.Join("/", prefix), watcher: newWatcher(c, codec, versioner, transformer), - leaseManager: newDefaultLeaseManager(c), + leaseManager: newDefaultLeaseManager(c, leaseReuseDurationSeconds), } return result } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index e1204cf2ea2..b9ba9e847d7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -47,6 +47,7 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -818,7 +819,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -895,8 +896,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + disablePagingStore := newStore(cluster.RandClient(), false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1394,7 +1395,7 @@ func TestListContinuation(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, codec, "", transformer) + store := newStore(etcdClient, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) ctx := context.Background() // Setup storage with the following structure: @@ -1556,7 +1557,7 @@ func TestListContinuationWithFilter(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, codec, "", transformer) + store := newStore(etcdClient, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) ctx := context.Background() preset := []struct { @@ -1659,7 +1660,7 @@ func TestListInconsistentContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1804,12 +1805,11 @@ func TestListInconsistentContinuation(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - ctx := context.Background() // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 // for testing purposes. See apimachinery/pkg/util/wait/wait.go - store.leaseManager.setLeaseReuseDurationSeconds(1) + store := newStore(cluster.RandClient(), true, 1, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() return ctx, store, cluster } @@ -1850,7 +1850,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer) + store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, configuredPrefix, transformer) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2017,7 +2017,7 @@ func TestConsistentList(t *testing.T) { transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(cluster.RandClient(), true, codec, "", transformer) + store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) transformer.store = store for i := 0; i < 5; i++ { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index ad3209ccd47..f7ced0d74a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" ) func TestWatch(t *testing.T) { @@ -225,13 +226,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 5dc0bbfb349..3a5a056e9ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -28,8 +28,9 @@ const ( StorageTypeUnset = "" StorageTypeETCD3 = "etcd3" - DefaultCompactInterval = 5 * time.Minute - DefaultDBMetricPollInterval = 30 * time.Second + DefaultCompactInterval = 5 * time.Minute + DefaultDBMetricPollInterval = 30 * time.Second + DefaultLeaseReuseDurationSeconds = 60 ) // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. @@ -74,14 +75,17 @@ type Config struct { CountMetricPollPeriod time.Duration // DBMetricPollInterval specifies how often should storage backend metric be updated. DBMetricPollInterval time.Duration + // LeaseReuseDurationSeconds specifies time in seconds that each lease is reused. See pkg/storage/etcd3/lease_manager.go + LeaseReuseDurationSeconds int64 } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { return &Config{ - Paging: true, - Prefix: prefix, - Codec: codec, - CompactionInterval: DefaultCompactInterval, - DBMetricPollInterval: DefaultDBMetricPollInterval, + Paging: true, + Prefix: prefix, + Codec: codec, + CompactionInterval: DefaultCompactInterval, + DBMetricPollInterval: DefaultDBMetricPollInterval, + LeaseReuseDurationSeconds: DefaultLeaseReuseDurationSeconds, } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 26067633e58..16e873ed058 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -249,7 +249,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil + return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging, c.LeaseReuseDurationSeconds), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD index fdd648ff6a5..37cd06ae1a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD @@ -27,6 +27,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index b9de03e15cc..1092e9bb212 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -46,6 +46,7 @@ import ( cacherstorage "k8s.io/apiserver/pkg/storage/cacher" "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -101,7 +102,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true, storagebackend.DefaultLeaseReuseDurationSeconds) return server, storage } From 537b8d3c06e869a6c00e9b645607ab03a09c9adb Mon Sep 17 00:00:00 2001 From: Ling Samuel Date: Mon, 7 Dec 2020 10:45:44 +0800 Subject: [PATCH 2/3] apiserver add metric etcd_lease_object_counts Signed-off-by: Ling Samuel --- cmd/kube-apiserver/app/options/BUILD | 1 + .../app/options/options_test.go | 19 ++++++---- .../apiserver/pkg/server/options/etcd.go | 2 +- .../k8s.io/apiserver/pkg/storage/etcd3/BUILD | 1 - .../pkg/storage/etcd3/lease_manager.go | 37 ++++++++++++++++++- .../pkg/storage/etcd3/lease_manager_test.go | 3 +- .../pkg/storage/etcd3/metrics/metrics.go | 17 +++++++++ .../apiserver/pkg/storage/etcd3/store.go | 8 ++-- .../apiserver/pkg/storage/etcd3/store_test.go | 21 ++++++----- .../pkg/storage/etcd3/watcher_test.go | 5 +-- .../pkg/storage/storagebackend/BUILD | 1 + .../pkg/storage/storagebackend/config.go | 22 +++++------ .../storage/storagebackend/factory/etcd3.go | 2 +- .../k8s.io/apiserver/pkg/storage/tests/BUILD | 1 - .../pkg/storage/tests/cacher_test.go | 3 +- 15 files changed, 97 insertions(+), 46 deletions(-) diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index 3e8edb7302e..2f6f81f18c7 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -58,6 +58,7 @@ go_test( "//pkg/master/reconcilers:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 9bd6857692d..d559884fc82 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -25,15 +25,15 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/spf13/pflag" - "k8s.io/component-base/logs" - "k8s.io/apiserver/pkg/admission" apiserveroptions "k8s.io/apiserver/pkg/server/options" + "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/storagebackend" auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" restclient "k8s.io/client-go/rest" cliflag "k8s.io/component-base/cli/flag" + "k8s.io/component-base/logs" "k8s.io/component-base/metrics" kapi "k8s.io/kubernetes/pkg/apis/core" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" @@ -116,6 +116,7 @@ func TestAddFlags(t *testing.T) { "--request-timeout=2m", "--storage-backend=etcd3", "--service-cluster-ip-range=192.168.128.0/17", + "--lease-reuse-duration-seconds=100", } fs.Parse(args) @@ -155,12 +156,14 @@ func TestAddFlags(t *testing.T) { TrustedCAFile: "/var/run/kubernetes/etcdca.crt", CertFile: "/var/run/kubernetes/etcdce.crt", }, - Paging: true, - Prefix: "/registry", - CompactionInterval: storagebackend.DefaultCompactInterval, - CountMetricPollPeriod: time.Minute, - DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, - LeaseReuseDurationSeconds: storagebackend.DefaultLeaseReuseDurationSeconds, + Paging: true, + Prefix: "/registry", + CompactionInterval: storagebackend.DefaultCompactInterval, + CountMetricPollPeriod: time.Minute, + DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, + LeaseManagerConfig: etcd3.LeaseManagerConfig{ + ReuseDurationSeconds: 100, + }, }, DefaultStorageMediaType: "application/vnd.kubernetes.protobuf", DeleteCollectionWorkers: 1, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index d5fed31d027..4ed7d3d5359 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -181,7 +181,7 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval, "The interval of requests to poll etcd and update metric. 0 disables the metric collection") - fs.Int64Var(&s.StorageConfig.LeaseReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseReuseDurationSeconds, + fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 1c13b56b084..982455986f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -35,7 +35,6 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go index b34c7fb0950..02c44b0dfd6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go @@ -22,8 +22,28 @@ import ( "time" "go.etcd.io/etcd/clientv3" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" + "k8s.io/klog/v2" ) +const ( + defaultLeaseReuseDurationSeconds = 60 + largeLeaseThreshold = 5000 +) + +// LeaseManagerConfig is configuration for creating a lease manager. +type LeaseManagerConfig struct { + // ReuseDurationSeconds specifies time in seconds that each lease is reused + ReuseDurationSeconds int64 +} + +// NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values +func NewDefaultLeaseManagerConfig() LeaseManagerConfig { + return LeaseManagerConfig{ + ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, + } +} + // leaseManager is used to manage leases requested from etcd. If a new write // needs a lease that has similar expiration time to the previous one, the old // lease will be reused to reduce the overhead of etcd, since lease operations @@ -39,11 +59,12 @@ type leaseManager struct { // numbers. We use var instead of const for testing purposes. leaseReuseDurationSeconds int64 leaseReuseDurationPercent float64 + leaseAttachedObjectCount int64 } // newDefaultLeaseManager creates a new lease manager using default setting. -func newDefaultLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64) *leaseManager { - return newLeaseManager(client, leaseReuseDurationSeconds, 0.05) +func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager { + return newLeaseManager(client, config.ReuseDurationSeconds, 0.05) } // newLeaseManager creates a new lease manager with the number of buffered @@ -67,9 +88,15 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl) valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime) sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime) + + // We count all operations that happened in the same lease, regardless of success or failure. + // Currently each GetLease call only attach 1 object + l.leaseAttachedObjectCount++ + if valid && sufficient { return l.prevLeaseID, nil } + // request a lease with a little extra ttl from etcd ttl += reuseDurationSeconds lcr, err := l.client.Lease.Grant(ctx, ttl) @@ -79,6 +106,12 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI // cache the new lease id l.prevLeaseID = lcr.ID l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second) + // refresh count + if l.leaseAttachedObjectCount > largeLeaseThreshold { + klog.Infof("The object count for lease %x is large: %v", l.prevLeaseID, l.leaseAttachedObjectCount) + } + metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount) + l.leaseAttachedObjectCount = 1 return lcr.ID, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go index 122453a72d5..722e0169cfb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go @@ -17,7 +17,6 @@ limitations under the License. package etcd3 import ( - "k8s.io/apiserver/pkg/storage/storagebackend" "testing" ) @@ -35,7 +34,7 @@ func TestGetReuseDurationSeconds(t *testing.T) { duration: 50, }, } - lm := newDefaultLeaseManager(nil, storagebackend.DefaultLeaseReuseDurationSeconds) + lm := newDefaultLeaseManager(nil, NewDefaultLeaseManagerConfig()) for i := 0; i < len(testCases); i++ { dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl) if dur != testCases[i].duration { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index a0de7e18cec..4268d12bee1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -57,6 +57,15 @@ var ( }, []string{"endpoint"}, ) + etcdLeaseObjectCounts = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Name: "etcd_lease_object_counts", + Help: "Number of objects attached to a single etcd lease.", + Buckets: []float64{10, 50, 100, 500, 1000, 2500, 5000}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{}, + ) ) var registerMetrics sync.Once @@ -68,6 +77,7 @@ func Register() { legacyregistry.MustRegister(etcdRequestLatency) legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(dbTotalSize) + legacyregistry.MustRegister(etcdLeaseObjectCounts) }) } @@ -95,3 +105,10 @@ func sinceInSeconds(start time.Time) float64 { func UpdateEtcdDbSize(ep string, size int64) { dbTotalSize.WithLabelValues(ep).Set(float64(size)) } + +// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric. +func UpdateLeaseObjectCount(count int64) { + // Currently we only store one previous lease, since all the events have the same ttl. + // See pkg/storage/etcd3/lease_manager.go + etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 8ee3c7cba7a..a9fd51be4dc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -83,11 +83,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool, leaseReuseDurationSeconds int64) storage.Interface { - return newStore(c, pagingEnabled, leaseReuseDurationSeconds, codec, prefix, transformer) +func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { + return newStore(c, pagingEnabled, codec, prefix, transformer, leaseManagerConfig) } -func newStore(c *clientv3.Client, pagingEnabled bool, leaseReuseDurationSeconds int64, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store { versioner := APIObjectVersioner{} result := &store{ client: c, @@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, pagingEnabled bool, leaseReuseDurationSeconds // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' pathPrefix: path.Join("/", prefix), watcher: newWatcher(c, codec, versioner, transformer), - leaseManager: newDefaultLeaseManager(c, leaseReuseDurationSeconds), + leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } return result } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index b9ba9e847d7..4ab404a2836 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -47,7 +47,6 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/storagebackend" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -819,7 +818,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig()) ctx := context.Background() preset := []struct { @@ -896,8 +895,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - disablePagingStore := newStore(cluster.RandClient(), false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig()) + disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1395,7 +1394,7 @@ func TestListContinuation(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) + store := newStore(etcdClient, true, codec, "", transformer, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1557,7 +1556,7 @@ func TestListContinuationWithFilter(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) + store := newStore(etcdClient, true, codec, "", transformer, NewDefaultLeaseManagerConfig()) ctx := context.Background() preset := []struct { @@ -1660,7 +1659,7 @@ func TestListInconsistentContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1808,7 +1807,9 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 // for testing purposes. See apimachinery/pkg/util/wait/wait.go - store := newStore(cluster.RandClient(), true, 1, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, LeaseManagerConfig{ + ReuseDurationSeconds: 1, + }) ctx := context.Background() return ctx, store, cluster } @@ -1850,7 +1851,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, configuredPrefix, transformer) + store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer, NewDefaultLeaseManagerConfig()) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2017,7 +2018,7 @@ func TestConsistentList(t *testing.T) { transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) + store := newStore(cluster.RandClient(), true, codec, "", transformer, NewDefaultLeaseManagerConfig()) transformer.store = store for i := 0; i < 5; i++ { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index f7ced0d74a3..bf985591065 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -38,7 +38,6 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/storagebackend" ) func TestWatch(t *testing.T) { @@ -226,13 +225,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}, NewDefaultLeaseManagerConfig()) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}, NewDefaultLeaseManagerConfig()) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD index 6a74f9eda4e..7949e8dda04 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD @@ -13,6 +13,7 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 3a5a056e9ec..06e059461ce 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/server/egressselector" + "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/value" ) @@ -28,9 +29,8 @@ const ( StorageTypeUnset = "" StorageTypeETCD3 = "etcd3" - DefaultCompactInterval = 5 * time.Minute - DefaultDBMetricPollInterval = 30 * time.Second - DefaultLeaseReuseDurationSeconds = 60 + DefaultCompactInterval = 5 * time.Minute + DefaultDBMetricPollInterval = 30 * time.Second ) // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. @@ -75,17 +75,17 @@ type Config struct { CountMetricPollPeriod time.Duration // DBMetricPollInterval specifies how often should storage backend metric be updated. DBMetricPollInterval time.Duration - // LeaseReuseDurationSeconds specifies time in seconds that each lease is reused. See pkg/storage/etcd3/lease_manager.go - LeaseReuseDurationSeconds int64 + + LeaseManagerConfig etcd3.LeaseManagerConfig } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { return &Config{ - Paging: true, - Prefix: prefix, - Codec: codec, - CompactionInterval: DefaultCompactInterval, - DBMetricPollInterval: DefaultDBMetricPollInterval, - LeaseReuseDurationSeconds: DefaultLeaseReuseDurationSeconds, + Paging: true, + Prefix: prefix, + Codec: codec, + CompactionInterval: DefaultCompactInterval, + DBMetricPollInterval: DefaultDBMetricPollInterval, + LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(), } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 16e873ed058..9cfd3c012c9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -249,7 +249,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging, c.LeaseReuseDurationSeconds), destroyFunc, nil + return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD index 37cd06ae1a2..fdd648ff6a5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD @@ -27,7 +27,6 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 1092e9bb212..fef527d3b3d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -46,7 +46,6 @@ import ( cacherstorage "k8s.io/apiserver/pkg/storage/cacher" "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" - "k8s.io/apiserver/pkg/storage/storagebackend" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -102,7 +101,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true, storagebackend.DefaultLeaseReuseDurationSeconds) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig()) return server, storage } From 3820a5d819582d4f6b4652757a3cc6c1a04692b4 Mon Sep 17 00:00:00 2001 From: Ling Samuel Date: Mon, 7 Dec 2020 10:45:44 +0800 Subject: [PATCH 3/3] api-server add --lease-max-object-count Signed-off-by: Ling Samuel --- .../app/options/options_test.go | 1 + .../pkg/storage/etcd3/lease_manager.go | 32 +++++++------ .../apiserver/pkg/storage/etcd3/store_test.go | 45 +++++++++++++++++++ 3 files changed, 64 insertions(+), 14 deletions(-) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index d559884fc82..b6f3c0ea8a0 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -163,6 +163,7 @@ func TestAddFlags(t *testing.T) { DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, LeaseManagerConfig: etcd3.LeaseManagerConfig{ ReuseDurationSeconds: 100, + MaxObjectCount: 1000, }, }, DefaultStorageMediaType: "application/vnd.kubernetes.protobuf", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go index 02c44b0dfd6..a05f293df3f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go @@ -23,24 +23,26 @@ import ( "go.etcd.io/etcd/clientv3" "k8s.io/apiserver/pkg/storage/etcd3/metrics" - "k8s.io/klog/v2" ) const ( defaultLeaseReuseDurationSeconds = 60 - largeLeaseThreshold = 5000 + defaultLeaseMaxObjectCount = 1000 ) // LeaseManagerConfig is configuration for creating a lease manager. type LeaseManagerConfig struct { // ReuseDurationSeconds specifies time in seconds that each lease is reused ReuseDurationSeconds int64 + // MaxObjectCount specifies how many objects that a lease can attach + MaxObjectCount int64 } // NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values func NewDefaultLeaseManagerConfig() LeaseManagerConfig { return LeaseManagerConfig{ ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, + MaxObjectCount: defaultLeaseMaxObjectCount, } } @@ -57,24 +59,29 @@ type leaseManager struct { // The period of time in seconds and percent of TTL that each lease is // reused. The minimum of them is used to avoid unreasonably large // numbers. We use var instead of const for testing purposes. - leaseReuseDurationSeconds int64 - leaseReuseDurationPercent float64 - leaseAttachedObjectCount int64 + leaseReuseDurationSeconds int64 + leaseReuseDurationPercent float64 + leaseMaxAttachedObjectCount int64 + leaseAttachedObjectCount int64 } // newDefaultLeaseManager creates a new lease manager using default setting. func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager { - return newLeaseManager(client, config.ReuseDurationSeconds, 0.05) + if config.MaxObjectCount <= 0 { + config.MaxObjectCount = defaultLeaseMaxObjectCount + } + return newLeaseManager(client, config.ReuseDurationSeconds, 0.05, config.MaxObjectCount) } // newLeaseManager creates a new lease manager with the number of buffered // leases, lease reuse duration in seconds and percentage. The percentage // value x means x*100%. -func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager { +func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64, maxObjectCount int64) *leaseManager { return &leaseManager{ - client: client, - leaseReuseDurationSeconds: leaseReuseDurationSeconds, - leaseReuseDurationPercent: leaseReuseDurationPercent, + client: client, + leaseReuseDurationSeconds: leaseReuseDurationSeconds, + leaseReuseDurationPercent: leaseReuseDurationPercent, + leaseMaxAttachedObjectCount: maxObjectCount, } } @@ -93,7 +100,7 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI // Currently each GetLease call only attach 1 object l.leaseAttachedObjectCount++ - if valid && sufficient { + if valid && sufficient && l.leaseAttachedObjectCount <= l.leaseMaxAttachedObjectCount { return l.prevLeaseID, nil } @@ -107,9 +114,6 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI l.prevLeaseID = lcr.ID l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second) // refresh count - if l.leaseAttachedObjectCount > largeLeaseThreshold { - klog.Infof("The object count for lease %x is large: %v", l.prevLeaseID, l.leaseAttachedObjectCount) - } metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount) l.leaseAttachedObjectCount = 1 return lcr.ID, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 4ab404a2836..6d743ac70b2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1809,6 +1809,7 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { // for testing purposes. See apimachinery/pkg/util/wait/wait.go store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, LeaseManagerConfig{ ReuseDurationSeconds: 1, + MaxObjectCount: defaultLeaseMaxObjectCount, }) ctx := context.Background() return ctx, store, cluster @@ -2120,3 +2121,47 @@ func TestCount(t *testing.T) { t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot) } } + +func TestLeaseMaxObjectCount(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, LeaseManagerConfig{ + ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, + MaxObjectCount: 2, + }) + ctx := context.Background() + defer cluster.Terminate(t) + + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}} + out := &example.Pod{} + + testCases := []struct { + key string + expectAttachedCount int64 + }{ + { + key: "testkey1", + expectAttachedCount: 1, + }, + { + key: "testkey2", + expectAttachedCount: 2, + }, + { + key: "testkey3", + // We assume each time has 1 object attached to the lease + // so after granting a new lease, the recorded count is set to 1 + expectAttachedCount: 1, + }, + } + + for _, tc := range testCases { + err := store.Create(ctx, tc.key, obj, out, 120) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + if store.leaseManager.leaseAttachedObjectCount != tc.expectAttachedCount { + t.Errorf("Lease manager recorded count %v should be %v", store.leaseManager.leaseAttachedObjectCount, tc.expectAttachedCount) + } + } +}