mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-09 00:34:10 -04:00
Merge pull request #100450 from mborsz/automated-cherry-pick-of-#97009-#97480-#98257-upstream-release-1.19
[1.19] Automated cherry pick of fixes for "large leases overload event etcd" issue (96836)
This commit is contained in:
commit
75aca05027
13 changed files with 145 additions and 39 deletions
|
|
@ -58,6 +58,7 @@ go_test(
|
|||
"//pkg/master/reconcilers:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
|
||||
|
|
|
|||
|
|
@ -25,15 +25,15 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/component-base/logs"
|
||||
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
||||
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/component-base/logs"
|
||||
"k8s.io/component-base/metrics"
|
||||
kapi "k8s.io/kubernetes/pkg/apis/core"
|
||||
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
|
||||
|
|
@ -116,6 +116,7 @@ func TestAddFlags(t *testing.T) {
|
|||
"--request-timeout=2m",
|
||||
"--storage-backend=etcd3",
|
||||
"--service-cluster-ip-range=192.168.128.0/17",
|
||||
"--lease-reuse-duration-seconds=100",
|
||||
}
|
||||
fs.Parse(args)
|
||||
|
||||
|
|
@ -160,6 +161,10 @@ func TestAddFlags(t *testing.T) {
|
|||
CompactionInterval: storagebackend.DefaultCompactInterval,
|
||||
CountMetricPollPeriod: time.Minute,
|
||||
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
|
||||
LeaseManagerConfig: etcd3.LeaseManagerConfig{
|
||||
ReuseDurationSeconds: 100,
|
||||
MaxObjectCount: 1000,
|
||||
},
|
||||
},
|
||||
DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
|
||||
DeleteCollectionWorkers: 1,
|
||||
|
|
|
|||
|
|
@ -180,6 +180,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
|||
|
||||
fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval,
|
||||
"The interval of requests to poll etcd and update metric. 0 disables the metric collection")
|
||||
|
||||
fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds,
|
||||
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
|
||||
|
|
|
|||
|
|
@ -22,8 +22,30 @@ import (
|
|||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLeaseReuseDurationSeconds = 60
|
||||
defaultLeaseMaxObjectCount = 1000
|
||||
)
|
||||
|
||||
// LeaseManagerConfig is configuration for creating a lease manager.
|
||||
type LeaseManagerConfig struct {
|
||||
// ReuseDurationSeconds specifies time in seconds that each lease is reused
|
||||
ReuseDurationSeconds int64
|
||||
// MaxObjectCount specifies how many objects that a lease can attach
|
||||
MaxObjectCount int64
|
||||
}
|
||||
|
||||
// NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values
|
||||
func NewDefaultLeaseManagerConfig() LeaseManagerConfig {
|
||||
return LeaseManagerConfig{
|
||||
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
|
||||
MaxObjectCount: defaultLeaseMaxObjectCount,
|
||||
}
|
||||
}
|
||||
|
||||
// leaseManager is used to manage leases requested from etcd. If a new write
|
||||
// needs a lease that has similar expiration time to the previous one, the old
|
||||
// lease will be reused to reduce the overhead of etcd, since lease operations
|
||||
|
|
@ -37,34 +59,32 @@ type leaseManager struct {
|
|||
// The period of time in seconds and percent of TTL that each lease is
|
||||
// reused. The minimum of them is used to avoid unreasonably large
|
||||
// numbers. We use var instead of const for testing purposes.
|
||||
leaseReuseDurationSeconds int64
|
||||
leaseReuseDurationPercent float64
|
||||
leaseReuseDurationSeconds int64
|
||||
leaseReuseDurationPercent float64
|
||||
leaseMaxAttachedObjectCount int64
|
||||
leaseAttachedObjectCount int64
|
||||
}
|
||||
|
||||
// newDefaultLeaseManager creates a new lease manager using default setting.
|
||||
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
|
||||
return newLeaseManager(client, 60, 0.05)
|
||||
func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager {
|
||||
if config.MaxObjectCount <= 0 {
|
||||
config.MaxObjectCount = defaultLeaseMaxObjectCount
|
||||
}
|
||||
return newLeaseManager(client, config.ReuseDurationSeconds, 0.05, config.MaxObjectCount)
|
||||
}
|
||||
|
||||
// newLeaseManager creates a new lease manager with the number of buffered
|
||||
// leases, lease reuse duration in seconds and percentage. The percentage
|
||||
// value x means x*100%.
|
||||
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager {
|
||||
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64, maxObjectCount int64) *leaseManager {
|
||||
return &leaseManager{
|
||||
client: client,
|
||||
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
|
||||
leaseReuseDurationPercent: leaseReuseDurationPercent,
|
||||
client: client,
|
||||
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
|
||||
leaseReuseDurationPercent: leaseReuseDurationPercent,
|
||||
leaseMaxAttachedObjectCount: maxObjectCount,
|
||||
}
|
||||
}
|
||||
|
||||
// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
|
||||
// reduce the extra lease duration to avoid unnecessary timeout in testing.
|
||||
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
|
||||
l.leaseMu.Lock()
|
||||
defer l.leaseMu.Unlock()
|
||||
l.leaseReuseDurationSeconds = duration
|
||||
}
|
||||
|
||||
// GetLease returns a lease based on requested ttl: if the cached previous
|
||||
// lease can be reused, reuse it; otherwise request a new one from etcd.
|
||||
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
|
||||
|
|
@ -75,9 +95,15 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI
|
|||
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
|
||||
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
|
||||
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
|
||||
if valid && sufficient {
|
||||
|
||||
// We count all operations that happened in the same lease, regardless of success or failure.
|
||||
// Currently each GetLease call only attach 1 object
|
||||
l.leaseAttachedObjectCount++
|
||||
|
||||
if valid && sufficient && l.leaseAttachedObjectCount <= l.leaseMaxAttachedObjectCount {
|
||||
return l.prevLeaseID, nil
|
||||
}
|
||||
|
||||
// request a lease with a little extra ttl from etcd
|
||||
ttl += reuseDurationSeconds
|
||||
lcr, err := l.client.Lease.Grant(ctx, ttl)
|
||||
|
|
@ -87,6 +113,9 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI
|
|||
// cache the new lease id
|
||||
l.prevLeaseID = lcr.ID
|
||||
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
|
||||
// refresh count
|
||||
metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount)
|
||||
l.leaseAttachedObjectCount = 1
|
||||
return lcr.ID, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ func TestGetReuseDurationSeconds(t *testing.T) {
|
|||
duration: 50,
|
||||
},
|
||||
}
|
||||
lm := newDefaultLeaseManager(nil)
|
||||
lm := newDefaultLeaseManager(nil, NewDefaultLeaseManagerConfig())
|
||||
for i := 0; i < len(testCases); i++ {
|
||||
dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl)
|
||||
if dur != testCases[i].duration {
|
||||
|
|
|
|||
|
|
@ -57,6 +57,15 @@ var (
|
|||
},
|
||||
[]string{"endpoint"},
|
||||
)
|
||||
etcdLeaseObjectCounts = compbasemetrics.NewHistogramVec(
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Name: "etcd_lease_object_counts",
|
||||
Help: "Number of objects attached to a single etcd lease.",
|
||||
Buckets: []float64{10, 50, 100, 500, 1000, 2500, 5000},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
|
|
@ -68,6 +77,7 @@ func Register() {
|
|||
legacyregistry.MustRegister(etcdRequestLatency)
|
||||
legacyregistry.MustRegister(objectCounts)
|
||||
legacyregistry.MustRegister(dbTotalSize)
|
||||
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -95,3 +105,10 @@ func sinceInSeconds(start time.Time) float64 {
|
|||
func UpdateEtcdDbSize(ep string, size int64) {
|
||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||
}
|
||||
|
||||
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||
func UpdateLeaseObjectCount(count int64) {
|
||||
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||
// See pkg/storage/etcd3/lease_manager.go
|
||||
etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,11 +83,11 @@ type objState struct {
|
|||
}
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
|
||||
return newStore(c, pagingEnabled, codec, prefix, transformer)
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
|
||||
return newStore(c, pagingEnabled, codec, prefix, transformer, leaseManagerConfig)
|
||||
}
|
||||
|
||||
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
||||
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
|
||||
versioner := APIObjectVersioner{}
|
||||
result := &store{
|
||||
client: c,
|
||||
|
|
@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefi
|
|||
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
watcher: newWatcher(c, codec, versioner, transformer),
|
||||
leaseManager: newDefaultLeaseManager(c),
|
||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -818,7 +818,7 @@ func TestTransformationFailure(t *testing.T) {
|
|||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
|
|
@ -895,8 +895,8 @@ func TestList(t *testing.T) {
|
|||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig())
|
||||
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
|
|
@ -1394,7 +1394,7 @@ func TestListContinuation(t *testing.T) {
|
|||
etcdClient := cluster.RandClient()
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
store := newStore(etcdClient, true, codec, "", transformer)
|
||||
store := newStore(etcdClient, true, codec, "", transformer, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
|
|
@ -1556,7 +1556,7 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||
etcdClient := cluster.RandClient()
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
store := newStore(etcdClient, true, codec, "", transformer)
|
||||
store := newStore(etcdClient, true, codec, "", transformer, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
|
|
@ -1659,7 +1659,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
|
|
@ -1804,12 +1804,14 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
// As 30s is the default timeout for testing in glboal configuration,
|
||||
// we cannot wait longer than that in a single time: change it to 10
|
||||
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
|
||||
store.leaseManager.setLeaseReuseDurationSeconds(1)
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, LeaseManagerConfig{
|
||||
ReuseDurationSeconds: 1,
|
||||
MaxObjectCount: defaultLeaseMaxObjectCount,
|
||||
})
|
||||
ctx := context.Background()
|
||||
return ctx, store, cluster
|
||||
}
|
||||
|
||||
|
|
@ -1850,7 +1852,7 @@ func TestPrefix(t *testing.T) {
|
|||
"/registry": "/registry",
|
||||
}
|
||||
for configuredPrefix, effectivePrefix := range testcases {
|
||||
store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer)
|
||||
store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer, NewDefaultLeaseManagerConfig())
|
||||
if store.pathPrefix != effectivePrefix {
|
||||
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
|
||||
}
|
||||
|
|
@ -2017,7 +2019,7 @@ func TestConsistentList(t *testing.T) {
|
|||
transformer := &fancyTransformer{
|
||||
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
|
||||
}
|
||||
store := newStore(cluster.RandClient(), true, codec, "", transformer)
|
||||
store := newStore(cluster.RandClient(), true, codec, "", transformer, NewDefaultLeaseManagerConfig())
|
||||
transformer.store = store
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
|
|
@ -2119,3 +2121,47 @@ func TestCount(t *testing.T) {
|
|||
t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseMaxObjectCount(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, LeaseManagerConfig{
|
||||
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
|
||||
MaxObjectCount: 2,
|
||||
})
|
||||
ctx := context.Background()
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}}
|
||||
out := &example.Pod{}
|
||||
|
||||
testCases := []struct {
|
||||
key string
|
||||
expectAttachedCount int64
|
||||
}{
|
||||
{
|
||||
key: "testkey1",
|
||||
expectAttachedCount: 1,
|
||||
},
|
||||
{
|
||||
key: "testkey2",
|
||||
expectAttachedCount: 2,
|
||||
},
|
||||
{
|
||||
key: "testkey3",
|
||||
// We assume each time has 1 object attached to the lease
|
||||
// so after granting a new lease, the recorded count is set to 1
|
||||
expectAttachedCount: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
err := store.Create(ctx, tc.key, obj, out, 120)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
if store.leaseManager.leaseAttachedObjectCount != tc.expectAttachedCount {
|
||||
t.Errorf("Lease manager recorded count %v should be %v", store.leaseManager.leaseAttachedObjectCount, tc.expectAttachedCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
|
|||
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
|
||||
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
|
||||
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}, NewDefaultLeaseManagerConfig())
|
||||
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ go_library(
|
|||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
)
|
||||
|
||||
|
|
@ -74,6 +75,8 @@ type Config struct {
|
|||
CountMetricPollPeriod time.Duration
|
||||
// DBMetricPollInterval specifies how often should storage backend metric be updated.
|
||||
DBMetricPollInterval time.Duration
|
||||
|
||||
LeaseManagerConfig etcd3.LeaseManagerConfig
|
||||
}
|
||||
|
||||
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
||||
|
|
@ -83,5 +86,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
|||
Codec: codec,
|
||||
CompactionInterval: DefaultCompactInterval,
|
||||
DBMetricPollInterval: DefaultDBMetricPollInterval,
|
||||
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -249,7 +249,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
|||
if transformer == nil {
|
||||
transformer = value.IdentityTransformer
|
||||
}
|
||||
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
|
||||
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
|
||||
}
|
||||
|
||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
|
|||
|
||||
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true)
|
||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
|
||||
return server, storage
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue