Merge pull request #138638 from shadowofs/refactor/storage-watch-test-structure

Group storage watch tests
This commit is contained in:
Kubernetes Prow Robot 2026-04-28 17:54:02 +05:30 committed by GitHub
commit 9df1be6658
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 176 additions and 214 deletions

View file

@ -399,107 +399,92 @@ func TestKeySchema(t *testing.T) {
}
func TestWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatch(ctx, t, cacher)
}
func TestWatchFromZero(t *testing.T) {
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactWatch(cacher, server.V3Client.Client))
}
func TestDeleteTriggerWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, cacher)
}
func TestWatchFromNonZero(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromNonZero(ctx, t, cacher)
}
func TestDelayedWatchDelivery(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher)
}
func TestWatchError(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
}
func TestWatchContextCancel(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
}
func TestWatcherTimeout(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatcherTimeout(ctx, t, cacher)
}
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, cacher)
}
func TestWatchInitializationSignal(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchInitializationSignal(ctx, t, cacher)
}
func TestClusterScopedWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withNodeNameAndNamespaceIndex)
t.Cleanup(terminate)
storagetesting.RunTestClusterScopedWatch(ctx, t, cacher)
}
func TestNamespaceScopedWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withNodeNameAndNamespaceIndex)
t.Cleanup(terminate)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
}
func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, cacher, true)
}
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher)
}
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
ctx, store, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}
func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
}
func TestWatchSemanticInitialEventsExtended(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)
}
func TestWatchListMatchSingle(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchListMatchSingle(context.TODO(), t, store)
t.Run("Watch", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatch(ctx, t, cacher)
})
t.Run("WatchFromZero", func(t *testing.T) {
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactWatch(cacher, server.V3Client.Client))
})
t.Run("DeleteTriggerWatch", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, cacher)
})
t.Run("WatchFromNonZero", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromNonZero(ctx, t, cacher)
})
t.Run("DelayedWatchDelivery", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher)
})
t.Run("WatchError", func(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
})
t.Run("WatchContextCancel", func(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
})
t.Run("WatcherTimeout", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatcherTimeout(ctx, t, cacher)
})
t.Run("WatchDeleteEventObjectHaveLatestRV", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, cacher)
})
t.Run("WatchInitializationSignal", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchInitializationSignal(ctx, t, cacher)
})
t.Run("ClusterScopedWatch", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withNodeNameAndNamespaceIndex)
t.Cleanup(terminate)
storagetesting.RunTestClusterScopedWatch(ctx, t, cacher)
})
t.Run("NamespaceScopedWatch", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withNodeNameAndNamespaceIndex)
t.Cleanup(terminate)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
})
t.Run("WatchDispatchBookmarkEvents", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, cacher, true)
})
t.Run("WatchBookmarksWithCorrectResourceVersion", func(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher)
})
t.Run("SendInitialEventsBackwardCompatibility", func(t *testing.T) {
ctx, store, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
})
t.Run("WatchSemantics", func(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
})
t.Run("WatchSemanticInitialEventsExtended", func(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)
})
t.Run("WatchListMatchSingle", func(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchListMatchSingle(context.TODO(), t, store)
})
}
// ===================================================

View file

@ -42,121 +42,98 @@ import (
)
func TestWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatch(ctx, t, store)
}
t.Run("Watch", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatch(ctx, t, store)
})
t.Run("ClusterScopedWatch", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
})
t.Run("NamespaceScopedWatch", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
})
t.Run("DeleteTriggerWatch", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
})
t.Run("WatchFromZero", func(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(store, client.Client))
})
t.Run("WatchFromNonZero", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
})
t.Run("DelayedWatchDelivery", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
})
t.Run("WatchError", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
})
t.Run("WatchContextCancel", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchContextCancel(ctx, t, store)
})
t.Run("WatcherTimeout", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatcherTimeout(ctx, t, store)
})
t.Run("WatchDeleteEventObjectHaveLatestRV", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
})
t.Run("WatchInitializationSignal", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchInitializationSignal(ctx, t, store)
})
t.Run("ProgressNotify", func(t *testing.T) {
clusterConfig := testserver.NewTestConfig(t)
clusterConfig.WatchProgressNotifyInterval = time.Second
ctx, store, client := testSetup(t, withClientConfig(clusterConfig))
func TestClusterScopedWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
}
storagetesting.RunOptionalTestProgressNotify(ctx, t, store, increaseRVFunc(client.Client))
})
t.Run("WatchWithUnsafeDelete", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AllowUnsafeMalformedObjectDeletion, true)
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{store})
})
t.Run("WatchDispatchBookmarkEvents", func(t *testing.T) {
clusterConfig := testserver.NewTestConfig(t)
clusterConfig.WatchProgressNotifyInterval = time.Second
ctx, store, _ := testSetup(t, withClientConfig(clusterConfig))
func TestNamespaceScopedWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
}
func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
}
func TestWatchFromZero(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(store, client.Client))
}
// TestWatchFromNonZero tests that
// - watch from non-0 should just watch changes after given version
func TestWatchFromNoneZero(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
}
func TestDelayedWatchDelivery(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
}
func TestWatchError(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
}
func TestWatchContextCancel(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchContextCancel(ctx, t, store)
}
func TestWatcherTimeout(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatcherTimeout(ctx, t, store)
}
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
}
func TestWatchInitializationSignal(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchInitializationSignal(ctx, t, store)
}
func TestProgressNotify(t *testing.T) {
clusterConfig := testserver.NewTestConfig(t)
clusterConfig.WatchProgressNotifyInterval = time.Second
ctx, store, client := testSetup(t, withClientConfig(clusterConfig))
storagetesting.RunOptionalTestProgressNotify(ctx, t, store, increaseRVFunc(client.Client))
}
func TestWatchWithUnsafeDelete(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AllowUnsafeMalformedObjectDeletion, true)
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{store})
}
// TestWatchDispatchBookmarkEvents makes sure that
// setting allowWatchBookmarks query param against
// etcd implementation doesn't have any effect.
func TestWatchDispatchBookmarkEvents(t *testing.T) {
clusterConfig := testserver.NewTestConfig(t)
clusterConfig.WatchProgressNotifyInterval = time.Second
ctx, store, _ := testSetup(t, withClientConfig(clusterConfig))
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
}
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}
func TestEtcdWatchSemantics(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
}
func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
}
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
}
func TestWatchListMatchSingle(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchListMatchSingle(ctx, t, store)
}
func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
})
t.Run("SendInitialEventsBackwardCompatibility", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
})
t.Run("WatchSemantics", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
})
t.Run("WatchSemanticsWithConcurrentDecode", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
})
t.Run("WatchSemanticInitialEventsExtended", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
})
t.Run("WatchListMatchSingle", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchListMatchSingle(ctx, t, store)
})
t.Run("WatchErrorEventIsBlockingFurtherEvent", func(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
})
}
// =======================================================================