mirror of
https://github.com/traefik/traefik.git
synced 2026-06-13 10:40:21 -04:00
Update Gateway API statuses once routing config is built
Some checks are pending
CodeQL / Analyze (push) Waiting to run
Build and Publish Documentation / Doc Process (push) Waiting to run
Build experimental image on branch / build-webui (push) Waiting to run
Build experimental image on branch / Build experimental image on branch (push) Waiting to run
Some checks are pending
CodeQL / Analyze (push) Waiting to run
Build and Publish Documentation / Doc Process (push) Waiting to run
Build experimental image on branch / build-webui (push) Waiting to run
Build experimental image on branch / Build experimental image on branch (push) Waiting to run
Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
parent
51b9a37615
commit
bcf768ee09
9 changed files with 400 additions and 185 deletions
|
|
@ -727,14 +727,6 @@ func (c *clientWrapper) UpdateBackendTLSPolicyStatus(ctx context.Context, policy
|
|||
ancestorStatuses = append(ancestorStatuses, ancestorStatus)
|
||||
continue
|
||||
}
|
||||
|
||||
// Keep statuses added by Traefik for other ancestors.
|
||||
// A BackendTLSPolicy can target services attached to different listeners.
|
||||
if !slices.ContainsFunc(status.Ancestors, func(s gatev1.PolicyAncestorStatus) bool {
|
||||
return reflect.DeepEqual(s.AncestorRef, ancestorStatus.AncestorRef)
|
||||
}) {
|
||||
ancestorStatuses = append(ancestorStatuses, ancestorStatus)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ancestorStatuses) > 16 {
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
)
|
||||
|
||||
// TODO: as described in the specification https://gateway-api.sigs.k8s.io/reference/spec/#gateway.networking.k8s.io%2fv1.GRPCRoute, we should check for hostname conflicts between HTTP and GRPC routes.
|
||||
func (p *Provider) loadGRPCRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration) {
|
||||
func (p *Provider) loadGRPCRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration, statusReport *statusReport) {
|
||||
routes, err := p.client.ListGRPCRoutes()
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msg("Unable to list GRPCRoutes")
|
||||
|
|
@ -40,9 +40,8 @@ func (p *Provider) loadGRPCRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
continue
|
||||
}
|
||||
|
||||
var parentStatuses []gatev1.RouteParentStatus
|
||||
for _, parentRef := range route.Spec.ParentRefs {
|
||||
parentStatus := &gatev1.RouteParentStatus{
|
||||
parentStatus := gatev1.RouteParentStatus{
|
||||
ParentRef: parentRef,
|
||||
ControllerName: controllerName,
|
||||
Conditions: []metav1.Condition{
|
||||
|
|
@ -78,7 +77,7 @@ func (p *Provider) loadGRPCRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
}
|
||||
}
|
||||
|
||||
routeConf, resolveRefCondition := p.loadGRPCRoute(logger.WithContext(ctx), listener, route, hostnames)
|
||||
routeConf, resolveRefCondition := p.loadGRPCRoute(logger.WithContext(ctx), listener, route, hostnames, statusReport)
|
||||
if accepted && listener.Attached {
|
||||
mergeHTTPConfiguration(routeConf, conf)
|
||||
}
|
||||
|
|
@ -86,23 +85,12 @@ func (p *Provider) loadGRPCRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
parentStatus.Conditions = upsertRouteConditionResolvedRefs(parentStatus.Conditions, resolveRefCondition)
|
||||
}
|
||||
|
||||
parentStatuses = append(parentStatuses, *parentStatus)
|
||||
}
|
||||
|
||||
status := gatev1.GRPCRouteStatus{
|
||||
RouteStatus: gatev1.RouteStatus{
|
||||
Parents: parentStatuses,
|
||||
},
|
||||
}
|
||||
if err := p.client.UpdateGRPCRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, status); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update GRPCRoute status")
|
||||
statusReport.RecordGRPCRouteStatus(ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, parentStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) loadGRPCRoute(ctx context.Context, listener gatewayListener, route *gatev1.GRPCRoute, hostnames []gatev1.Hostname) (*dynamic.Configuration, metav1.Condition) {
|
||||
func (p *Provider) loadGRPCRoute(ctx context.Context, listener gatewayListener, route *gatev1.GRPCRoute, hostnames []gatev1.Hostname, statusReport *statusReport) (*dynamic.Configuration, metav1.Condition) {
|
||||
conf := &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: make(map[string]*dynamic.Router),
|
||||
|
|
@ -169,7 +157,7 @@ func (p *Provider) loadGRPCRoute(ctx context.Context, listener gatewayListener,
|
|||
|
||||
default:
|
||||
var serviceCondition *metav1.Condition
|
||||
router.Service, serviceCondition = p.loadGRPCService(ctx, listener, conf, routerName, routeRule, route)
|
||||
router.Service, serviceCondition = p.loadGRPCService(listener, conf, routerName, routeRule, route, statusReport)
|
||||
if serviceCondition != nil {
|
||||
condition = *serviceCondition
|
||||
}
|
||||
|
|
@ -182,7 +170,7 @@ func (p *Provider) loadGRPCRoute(ctx context.Context, listener gatewayListener,
|
|||
return conf, condition
|
||||
}
|
||||
|
||||
func (p *Provider) loadGRPCService(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, routeKey string, routeRule gatev1.GRPCRouteRule, route *gatev1.GRPCRoute) (string, *metav1.Condition) {
|
||||
func (p *Provider) loadGRPCService(listener gatewayListener, conf *dynamic.Configuration, routeKey string, routeRule gatev1.GRPCRouteRule, route *gatev1.GRPCRoute, statusReport *statusReport) (string, *metav1.Condition) {
|
||||
name := routeKey + "-wrr"
|
||||
if _, ok := conf.HTTP.Services[name]; ok {
|
||||
return name, nil
|
||||
|
|
@ -191,7 +179,7 @@ func (p *Provider) loadGRPCService(ctx context.Context, listener gatewayListener
|
|||
var wrr dynamic.WeightedRoundRobin
|
||||
var condition *metav1.Condition
|
||||
for _, backendRef := range routeRule.BackendRefs {
|
||||
svcName, svc, errCondition := p.loadGRPCBackendRef(ctx, listener, conf, route, backendRef)
|
||||
svcName, svc, errCondition := p.loadGRPCBackendRef(listener, conf, route, backendRef, statusReport)
|
||||
weight := ptr.To(int(ptr.Deref(backendRef.Weight, 1)))
|
||||
if errCondition != nil {
|
||||
condition = errCondition
|
||||
|
|
@ -220,7 +208,7 @@ func (p *Provider) loadGRPCService(ctx context.Context, listener gatewayListener
|
|||
return name, condition
|
||||
}
|
||||
|
||||
func (p *Provider) loadGRPCBackendRef(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, route *gatev1.GRPCRoute, backendRef gatev1.GRPCBackendRef) (string, *dynamic.Service, *metav1.Condition) {
|
||||
func (p *Provider) loadGRPCBackendRef(listener gatewayListener, conf *dynamic.Configuration, route *gatev1.GRPCRoute, backendRef gatev1.GRPCBackendRef, statusReport *statusReport) (string, *dynamic.Service, *metav1.Condition) {
|
||||
kind := ptr.Deref(backendRef.Kind, kindService)
|
||||
|
||||
group := groupCore
|
||||
|
|
@ -272,7 +260,7 @@ func (p *Provider) loadGRPCBackendRef(ctx context.Context, listener gatewayListe
|
|||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr + "-grpc")
|
||||
|
||||
lb, st, errCondition := p.loadGRPCServers(ctx, namespace, route, backendRef, listener)
|
||||
lb, st, errCondition := p.loadGRPCServers(namespace, route, backendRef, listener, statusReport)
|
||||
if errCondition != nil {
|
||||
return serviceName, nil, errCondition
|
||||
}
|
||||
|
|
@ -331,7 +319,7 @@ func (p *Provider) loadGRPCMiddlewares(conf *dynamic.Configuration, namespace, r
|
|||
return middlewareNames, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadGRPCServers(ctx context.Context, namespace string, route *gatev1.GRPCRoute, backendRef gatev1.GRPCBackendRef, listener gatewayListener) (*dynamic.ServersLoadBalancer, *dynamic.ServersTransport, *metav1.Condition) {
|
||||
func (p *Provider) loadGRPCServers(namespace string, route *gatev1.GRPCRoute, backendRef gatev1.GRPCBackendRef, listener gatewayListener, statusReport *statusReport) (*dynamic.ServersLoadBalancer, *dynamic.ServersTransport, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef)
|
||||
if err != nil {
|
||||
return nil, nil, &metav1.Condition{
|
||||
|
|
@ -408,12 +396,7 @@ func (p *Provider) loadGRPCServers(ctx context.Context, namespace string, route
|
|||
},
|
||||
)
|
||||
|
||||
status := gatev1.PolicyStatus{
|
||||
Ancestors: []gatev1.PolicyAncestorStatus{policyAncestorStatus},
|
||||
}
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).Msg("Unable to update conflicting BackendTLSPolicy status")
|
||||
}
|
||||
statusReport.RecordBackendTLSPolicyStatus(ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policyAncestorStatus)
|
||||
|
||||
continue
|
||||
}
|
||||
|
|
@ -440,12 +423,7 @@ func (p *Provider) loadGRPCServers(ctx context.Context, namespace string, route
|
|||
})
|
||||
}
|
||||
|
||||
status := gatev1.PolicyStatus{
|
||||
Ancestors: []gatev1.PolicyAncestorStatus{policyAncestorStatus},
|
||||
}
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).Msg("Unable to update BackendTLSPolicy status")
|
||||
}
|
||||
statusReport.RecordBackendTLSPolicyStatus(ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policyAncestorStatus)
|
||||
|
||||
// When something went wrong during the loading of a ServersTransport,
|
||||
// we stop here and return a route condition error.
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import (
|
|||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
)
|
||||
|
||||
func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration) {
|
||||
func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration, statusReport *statusReport) {
|
||||
routes, err := p.client.ListHTTPRoutes()
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msg("Unable to list HTTPRoutes")
|
||||
|
|
@ -41,9 +41,8 @@ func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
continue
|
||||
}
|
||||
|
||||
var parentStatuses []gatev1.RouteParentStatus
|
||||
for _, parentRef := range route.Spec.ParentRefs {
|
||||
parentStatus := &gatev1.RouteParentStatus{
|
||||
parentStatus := gatev1.RouteParentStatus{
|
||||
ParentRef: parentRef,
|
||||
ControllerName: controllerName,
|
||||
Conditions: []metav1.Condition{
|
||||
|
|
@ -79,7 +78,7 @@ func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
}
|
||||
}
|
||||
|
||||
routeConf, resolveRefCondition := p.loadHTTPRoute(logger.WithContext(ctx), listener, route, hostnames)
|
||||
routeConf, resolveRefCondition := p.loadHTTPRoute(logger.WithContext(ctx), listener, route, hostnames, statusReport)
|
||||
if accepted && listener.Attached {
|
||||
mergeHTTPConfiguration(routeConf, conf)
|
||||
}
|
||||
|
|
@ -87,23 +86,12 @@ func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
parentStatus.Conditions = upsertRouteConditionResolvedRefs(parentStatus.Conditions, resolveRefCondition)
|
||||
}
|
||||
|
||||
parentStatuses = append(parentStatuses, *parentStatus)
|
||||
}
|
||||
|
||||
status := gatev1.HTTPRouteStatus{
|
||||
RouteStatus: gatev1.RouteStatus{
|
||||
Parents: parentStatuses,
|
||||
},
|
||||
}
|
||||
if err := p.client.UpdateHTTPRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, status); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update HTTPRoute status")
|
||||
statusReport.RecordHTTPRouteStatus(ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, parentStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) loadHTTPRoute(ctx context.Context, listener gatewayListener, route *gatev1.HTTPRoute, hostnames []gatev1.Hostname) (*dynamic.Configuration, metav1.Condition) {
|
||||
func (p *Provider) loadHTTPRoute(ctx context.Context, listener gatewayListener, route *gatev1.HTTPRoute, hostnames []gatev1.Hostname, statusReport *statusReport) (*dynamic.Configuration, metav1.Condition) {
|
||||
conf := &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: make(map[string]*dynamic.Router),
|
||||
|
|
@ -176,7 +164,7 @@ func (p *Provider) loadHTTPRoute(ctx context.Context, listener gatewayListener,
|
|||
|
||||
default:
|
||||
var serviceCondition *metav1.Condition
|
||||
router.Service, serviceCondition = p.loadWRRService(ctx, listener, conf, routerName, routeRule, route, match.Path)
|
||||
router.Service, serviceCondition = p.loadWRRService(ctx, listener, conf, routerName, routeRule, route, match.Path, statusReport)
|
||||
if serviceCondition != nil {
|
||||
condition = *serviceCondition
|
||||
}
|
||||
|
|
@ -191,7 +179,7 @@ func (p *Provider) loadHTTPRoute(ctx context.Context, listener gatewayListener,
|
|||
return conf, condition
|
||||
}
|
||||
|
||||
func (p *Provider) loadWRRService(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, routeKey string, routeRule gatev1.HTTPRouteRule, route *gatev1.HTTPRoute, pathMatch *gatev1.HTTPPathMatch) (string, *metav1.Condition) {
|
||||
func (p *Provider) loadWRRService(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, routeKey string, routeRule gatev1.HTTPRouteRule, route *gatev1.HTTPRoute, pathMatch *gatev1.HTTPPathMatch, statusReport *statusReport) (string, *metav1.Condition) {
|
||||
name := routeKey + "-wrr"
|
||||
if _, ok := conf.HTTP.Services[name]; ok {
|
||||
return name, nil
|
||||
|
|
@ -202,7 +190,7 @@ func (p *Provider) loadWRRService(ctx context.Context, listener gatewayListener,
|
|||
for _, backendRef := range routeRule.BackendRefs {
|
||||
// TODO in loadService we need to always return a non-nil serviceName even when there is an error which is not the
|
||||
// usual defacto.
|
||||
svcName, errCondition := p.loadService(ctx, listener, conf, route, backendRef, pathMatch)
|
||||
svcName, errCondition := p.loadService(listener, conf, route, backendRef, pathMatch, statusReport)
|
||||
weight := ptr.To(int(ptr.Deref(backendRef.Weight, 1)))
|
||||
if errCondition != nil {
|
||||
log.Ctx(ctx).Error().
|
||||
|
|
@ -229,7 +217,7 @@ func (p *Provider) loadWRRService(ctx context.Context, listener gatewayListener,
|
|||
|
||||
// loadService returns a dynamic.Service config corresponding to the given gatev1.HTTPBackendRef.
|
||||
// Note that the returned dynamic.Service config can be nil (for cross-provider, internal services, and backendFunc).
|
||||
func (p *Provider) loadService(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef, pathMatch *gatev1.HTTPPathMatch) (string, *metav1.Condition) {
|
||||
func (p *Provider) loadService(listener gatewayListener, conf *dynamic.Configuration, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef, pathMatch *gatev1.HTTPPathMatch, statusReport *statusReport) (string, *metav1.Condition) {
|
||||
kind := ptr.Deref(backendRef.Kind, kindService)
|
||||
|
||||
group := groupCore
|
||||
|
|
@ -315,7 +303,7 @@ func (p *Provider) loadService(ctx context.Context, listener gatewayListener, co
|
|||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, st, errCondition := p.loadHTTPServers(ctx, namespace, route, backendRef, listener)
|
||||
lb, st, errCondition := p.loadHTTPServers(namespace, route, backendRef, listener, statusReport)
|
||||
if errCondition != nil {
|
||||
return serviceName, errCondition
|
||||
}
|
||||
|
|
@ -442,7 +430,7 @@ func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRe
|
|||
return filterFunc(string(extensionRef.Name), namespace)
|
||||
}
|
||||
|
||||
func (p *Provider) loadHTTPServers(ctx context.Context, namespace string, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef, listener gatewayListener) (*dynamic.ServersLoadBalancer, *dynamic.ServersTransport, *metav1.Condition) {
|
||||
func (p *Provider) loadHTTPServers(namespace string, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef, listener gatewayListener, statusReport *statusReport) (*dynamic.ServersLoadBalancer, *dynamic.ServersTransport, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef)
|
||||
if err != nil {
|
||||
return nil, nil, &metav1.Condition{
|
||||
|
|
@ -519,12 +507,7 @@ func (p *Provider) loadHTTPServers(ctx context.Context, namespace string, route
|
|||
},
|
||||
)
|
||||
|
||||
status := gatev1.PolicyStatus{
|
||||
Ancestors: []gatev1.PolicyAncestorStatus{policyAncestorStatus},
|
||||
}
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).Msg("Unable to update conflicting BackendTLSPolicy status")
|
||||
}
|
||||
statusReport.RecordBackendTLSPolicyStatus(ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policyAncestorStatus)
|
||||
|
||||
continue
|
||||
}
|
||||
|
|
@ -551,12 +534,7 @@ func (p *Provider) loadHTTPServers(ctx context.Context, namespace string, route
|
|||
})
|
||||
}
|
||||
|
||||
status := gatev1.PolicyStatus{
|
||||
Ancestors: []gatev1.PolicyAncestorStatus{policyAncestorStatus},
|
||||
}
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).Msg("Unable to update BackendTLSPolicy status")
|
||||
}
|
||||
statusReport.RecordBackendTLSPolicyStatus(ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policyAncestorStatus)
|
||||
|
||||
// When something wen wrong during the loading of a ServersTransport,
|
||||
// we stop here and return a route condition error.
|
||||
|
|
|
|||
|
|
@ -224,20 +224,29 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
|||
// Note that event is the *first* event that came in during this throttling interval -- if we're hitting our throttle, we may have dropped events.
|
||||
// This is fine, because we don't treat different event types differently.
|
||||
// But if we do in the future, we'll need to track more information about the dropped events.
|
||||
conf := p.loadConfigurationFromGateways(ctxLog)
|
||||
|
||||
confHash, err := hashstructure.Hash(conf, nil)
|
||||
switch {
|
||||
case err != nil:
|
||||
logger.Error().Msg("Unable to hash the configuration")
|
||||
case p.lastConfiguration.Get() == confHash:
|
||||
logger.Debug().Msgf("Skipping Kubernetes event kind %T", event)
|
||||
default:
|
||||
p.lastConfiguration.Set(confHash)
|
||||
configurationChan <- dynamic.Message{
|
||||
ProviderName: ProviderName,
|
||||
Configuration: conf,
|
||||
conf, statusReport, err := p.loadConfigurationFromGateways(ctxLog)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Unable to load configuration from Gateways")
|
||||
} else {
|
||||
confHash, err := hashstructure.Hash(conf, nil)
|
||||
switch {
|
||||
case err != nil:
|
||||
logger.Error().Msg("Unable to hash the configuration")
|
||||
case p.lastConfiguration.Get() == confHash:
|
||||
logger.Debug().Msgf("Skipping Kubernetes event kind %T", event)
|
||||
default:
|
||||
p.lastConfiguration.Set(confHash)
|
||||
configurationChan <- dynamic.Message{
|
||||
ProviderName: ProviderName,
|
||||
Configuration: conf,
|
||||
}
|
||||
}
|
||||
|
||||
// Flush regardless of whether the dynamic configuration changed: the
|
||||
// statusReport is independent of confHash and may carry writes even
|
||||
// when the data plane has nothing new to consume (e.g. a GatewayClass
|
||||
// that's now Accepted but has no Gateway pointing at it yet).
|
||||
statusReport.Flush(ctxLog, p.client)
|
||||
}
|
||||
|
||||
// If we're throttling,
|
||||
|
|
@ -304,7 +313,8 @@ func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {
|
|||
}
|
||||
|
||||
// TODO Handle errors and update resources statuses (gatewayClass, gateway).
|
||||
func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.Configuration {
|
||||
func (p *Provider) loadConfigurationFromGateways(ctx context.Context) (*dynamic.Configuration, *statusReport, error) {
|
||||
statusReport := newStatusReport()
|
||||
conf := &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: map[string]*dynamic.Router{},
|
||||
|
|
@ -327,14 +337,12 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
|
||||
addresses, err := p.gatewayAddresses()
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msg("Unable to get Gateway status addresses")
|
||||
return nil
|
||||
return nil, nil, fmt.Errorf("getting gateway addresses: %w", err)
|
||||
}
|
||||
|
||||
gatewayClasses, err := p.client.ListGatewayClasses()
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msg("Unable to list GatewayClasses")
|
||||
return nil
|
||||
return nil, nil, fmt.Errorf("listing gateway classes: %w", err)
|
||||
}
|
||||
|
||||
var supportedFeatures []gatev1.SupportedFeature
|
||||
|
|
@ -365,13 +373,7 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
SupportedFeatures: supportedFeatures,
|
||||
}
|
||||
|
||||
if err := p.client.UpdateGatewayClassStatus(ctx, gatewayClass.Name, status); err != nil {
|
||||
log.Ctx(ctx).
|
||||
Warn().
|
||||
Err(err).
|
||||
Str("gateway_class", gatewayClass.Name).
|
||||
Msg("Unable to update GatewayClass status")
|
||||
}
|
||||
statusReport.RecordGatewayClassStatus(gatewayClass.Name, status)
|
||||
}
|
||||
|
||||
var gateways []*gatev1.Gateway
|
||||
|
|
@ -392,14 +394,14 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
gatewayListeners = append(gatewayListeners, p.loadGatewayListeners(logger.WithContext(ctx), gateway, conf)...)
|
||||
}
|
||||
|
||||
p.loadHTTPRoutes(ctx, gatewayListeners, conf)
|
||||
p.loadHTTPRoutes(ctx, gatewayListeners, conf, statusReport)
|
||||
|
||||
p.loadGRPCRoutes(ctx, gatewayListeners, conf)
|
||||
p.loadGRPCRoutes(ctx, gatewayListeners, conf, statusReport)
|
||||
|
||||
p.loadTLSRoutes(ctx, gatewayListeners, conf)
|
||||
p.loadTLSRoutes(ctx, gatewayListeners, conf, statusReport)
|
||||
|
||||
if p.ExperimentalChannel {
|
||||
p.loadTCPRoutes(ctx, gatewayListeners, conf)
|
||||
p.loadTCPRoutes(ctx, gatewayListeners, conf, statusReport)
|
||||
}
|
||||
|
||||
for _, gateway := range gateways {
|
||||
|
|
@ -430,14 +432,10 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
Msg("Gateway Not Accepted")
|
||||
}
|
||||
|
||||
if err = p.client.UpdateGatewayStatus(ctx, ktypes.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}, gatewayStatus); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update Gateway status")
|
||||
}
|
||||
statusReport.RecordGatewayStatus(ktypes.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}, gatewayStatus)
|
||||
}
|
||||
|
||||
return conf
|
||||
return conf, statusReport, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadGatewayListeners(ctx context.Context, gateway *gatev1.Gateway, conf *dynamic.Configuration) []gatewayListener {
|
||||
|
|
|
|||
|
|
@ -95,7 +95,10 @@ func TestGatewayClassLabelSelector(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
_ = p.loadConfigurationFromGateways(t.Context())
|
||||
_, statusReport, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
statusReport.Flush(t.Context(), p.client)
|
||||
|
||||
gw, err := gwClient.GatewayV1().Gateways("default").Get(t.Context(), "traefik-external", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
|
|
@ -2753,7 +2756,9 @@ func TestLoadHTTPRoutes(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -3216,7 +3221,9 @@ func TestLoadHTTPRoutes_backendExtensionRef(t *testing.T) {
|
|||
p.RegisterBackendFuncs(group, kind, backendFunc)
|
||||
}
|
||||
}
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -3502,7 +3509,9 @@ func TestLoadHTTPRoutes_filterExtensionRef(t *testing.T) {
|
|||
p.RegisterFilterFuncs(group, kind, filterFunc)
|
||||
}
|
||||
}
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -3684,7 +3693,9 @@ func TestLoadGRPCRoutes(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -3976,7 +3987,9 @@ func TestLoadGRPCRoutes_filterExtensionRef(t *testing.T) {
|
|||
p.RegisterFilterFuncs(group, kind, filterFunc)
|
||||
}
|
||||
}
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -4897,7 +4910,9 @@ func TestLoadTCPRoutes(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -6363,7 +6378,9 @@ func TestLoadTLSRoutes(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -7353,7 +7370,9 @@ func TestLoadMixedRoutes(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -7651,7 +7670,9 @@ func TestLoadRoutesWithReferenceGrants(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, conf)
|
||||
})
|
||||
}
|
||||
|
|
@ -8815,7 +8836,8 @@ func TestCrossProviderNamespaces_HTTPRoute(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
router, ok := conf.HTTP.Routers["httproute-default-http-app-1-gw-default-my-gateway-ep-web-0-af329269dd38031b03e3"]
|
||||
require.True(t, ok)
|
||||
|
|
@ -8882,7 +8904,8 @@ func TestCrossProviderNamespaces_TCPRoute(t *testing.T) {
|
|||
ExperimentalChannel: true,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
router, ok := conf.TCP.Routers["tcproute-default-tcp-app-1-gw-default-my-gateway-ep-tcp-0-e3b0c44298fc1c149afb"]
|
||||
require.True(t, ok)
|
||||
|
|
@ -8955,7 +8978,8 @@ func TestCrossProviderNamespaces_TLSRoute(t *testing.T) {
|
|||
client: client,
|
||||
}
|
||||
|
||||
conf := p.loadConfigurationFromGateways(t.Context())
|
||||
conf, _, err := p.loadConfigurationFromGateways(t.Context())
|
||||
assert.NoError(t, err)
|
||||
|
||||
fmt.Println(conf.TCP.Routers)
|
||||
|
||||
|
|
|
|||
136
pkg/provider/kubernetes/gateway/status.go
Normal file
136
pkg/provider/kubernetes/gateway/status.go
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
ktypes "k8s.io/apimachinery/pkg/types"
|
||||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
|
||||
)
|
||||
|
||||
// statusReport collects the status writes produced by a single rebuild so they
|
||||
// can be flushed to the apiserver after the dynamic configuration has been published.
|
||||
type statusReport struct {
|
||||
gatewayClasses map[string]gatev1.GatewayClassStatus
|
||||
gateways map[ktypes.NamespacedName]gatev1.GatewayStatus
|
||||
httpRoutes map[ktypes.NamespacedName]gatev1.RouteStatus
|
||||
grpcRoutes map[ktypes.NamespacedName]gatev1.RouteStatus
|
||||
tcpRoutes map[ktypes.NamespacedName]gatev1.RouteStatus
|
||||
tlsRoutes map[ktypes.NamespacedName]gatev1.RouteStatus
|
||||
backendTLSPolicies map[ktypes.NamespacedName]gatev1.PolicyStatus
|
||||
}
|
||||
|
||||
func newStatusReport() *statusReport {
|
||||
return &statusReport{
|
||||
gatewayClasses: map[string]gatev1.GatewayClassStatus{},
|
||||
gateways: map[ktypes.NamespacedName]gatev1.GatewayStatus{},
|
||||
httpRoutes: map[ktypes.NamespacedName]gatev1.RouteStatus{},
|
||||
grpcRoutes: map[ktypes.NamespacedName]gatev1.RouteStatus{},
|
||||
tcpRoutes: map[ktypes.NamespacedName]gatev1.RouteStatus{},
|
||||
tlsRoutes: map[ktypes.NamespacedName]gatev1.RouteStatus{},
|
||||
backendTLSPolicies: map[ktypes.NamespacedName]gatev1.PolicyStatus{},
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sends every status write collected during the
|
||||
// routing configuration build to the Kubernetes API server.
|
||||
func (r *statusReport) Flush(ctx context.Context, client *clientWrapper) {
|
||||
logger := log.Ctx(ctx)
|
||||
|
||||
for name, status := range r.gatewayClasses {
|
||||
if err := client.UpdateGatewayClassStatus(ctx, name, status); err != nil {
|
||||
logger.Warn().Err(err).Str("gateway_class", name).Msg("Unable to update GatewayClass status")
|
||||
}
|
||||
}
|
||||
|
||||
for name, status := range r.gateways {
|
||||
if err := client.UpdateGatewayStatus(ctx, name, status); err != nil {
|
||||
logger.Warn().Err(err).Str("gateway", name.Name).Str("namespace", name.Namespace).Msg("Unable to update Gateway status")
|
||||
}
|
||||
}
|
||||
|
||||
for name, routeStatus := range r.httpRoutes {
|
||||
status := gatev1.HTTPRouteStatus{RouteStatus: routeStatus}
|
||||
if err := client.UpdateHTTPRouteStatus(ctx, name, status); err != nil {
|
||||
logger.Warn().Err(err).Str("http_route", name.Name).Str("namespace", name.Namespace).Msg("Unable to update HTTPRoute status")
|
||||
}
|
||||
}
|
||||
|
||||
for name, routeStatus := range r.grpcRoutes {
|
||||
status := gatev1.GRPCRouteStatus{RouteStatus: routeStatus}
|
||||
if err := client.UpdateGRPCRouteStatus(ctx, name, status); err != nil {
|
||||
logger.Warn().Err(err).Str("grpc_route", name.Name).Str("namespace", name.Namespace).Msg("Unable to update GRPCRoute status")
|
||||
}
|
||||
}
|
||||
|
||||
for name, routeStatus := range r.tcpRoutes {
|
||||
status := gatev1alpha2.TCPRouteStatus{RouteStatus: routeStatus}
|
||||
if err := client.UpdateTCPRouteStatus(ctx, name, status); err != nil {
|
||||
logger.Warn().Err(err).Str("tcp_route", name.Name).Str("namespace", name.Namespace).Msg("Unable to update TCPRoute status")
|
||||
}
|
||||
}
|
||||
|
||||
for name, routeStatus := range r.tlsRoutes {
|
||||
status := gatev1.TLSRouteStatus{RouteStatus: routeStatus}
|
||||
if err := client.UpdateTLSRouteStatus(ctx, name, status); err != nil {
|
||||
logger.Warn().Err(err).Str("tls_route", name.Name).Str("namespace", name.Namespace).Msg("Unable to update TLSRoute status")
|
||||
}
|
||||
}
|
||||
|
||||
for name, policyStatus := range r.backendTLSPolicies {
|
||||
if err := client.UpdateBackendTLSPolicyStatus(ctx, name, policyStatus); err != nil {
|
||||
logger.Warn().Err(err).Str("backend_tls_policy", name.Name).Str("namespace", name.Namespace).Msg("Unable to update BackendTLSPolicy status")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordGatewayClassStatus(gatewayClassName string, status gatev1.GatewayClassStatus) {
|
||||
r.gatewayClasses[gatewayClassName] = status
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordGatewayStatus(gateway ktypes.NamespacedName, status gatev1.GatewayStatus) {
|
||||
r.gateways[gateway] = status
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordHTTPRouteStatus(route ktypes.NamespacedName, status gatev1.RouteParentStatus) {
|
||||
r.httpRoutes[route] = gatev1.RouteStatus{
|
||||
Parents: append(r.httpRoutes[route].Parents, status),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordGRPCRouteStatus(route ktypes.NamespacedName, status gatev1.RouteParentStatus) {
|
||||
r.grpcRoutes[route] = gatev1.RouteStatus{
|
||||
Parents: append(r.grpcRoutes[route].Parents, status),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordTCPRouteStatus(route ktypes.NamespacedName, status gatev1.RouteParentStatus) {
|
||||
r.tcpRoutes[route] = gatev1.RouteStatus{
|
||||
Parents: append(r.tcpRoutes[route].Parents, status),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordTLSRouteStatus(route ktypes.NamespacedName, status gatev1.RouteParentStatus) {
|
||||
r.tlsRoutes[route] = gatev1.RouteStatus{
|
||||
Parents: append(r.tlsRoutes[route].Parents, status),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *statusReport) RecordBackendTLSPolicyStatus(policy ktypes.NamespacedName, status gatev1.PolicyAncestorStatus) {
|
||||
var ancestors []gatev1.PolicyAncestorStatus
|
||||
|
||||
// Keep existing ancestor statuses, except if it matches the status to merge.
|
||||
for _, existing := range r.backendTLSPolicies[policy].Ancestors {
|
||||
if reflect.DeepEqual(existing.AncestorRef, status.AncestorRef) {
|
||||
continue
|
||||
}
|
||||
|
||||
ancestors = append(ancestors, existing)
|
||||
}
|
||||
|
||||
r.backendTLSPolicies[policy] = gatev1.PolicyStatus{
|
||||
Ancestors: append(ancestors, status), // Add the new status to the existing ancestors statuses.
|
||||
}
|
||||
}
|
||||
152
pkg/provider/kubernetes/gateway/status_test.go
Normal file
152
pkg/provider/kubernetes/gateway/status_test.go
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
package gateway
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
ktypes "k8s.io/apimachinery/pkg/types"
|
||||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
)
|
||||
|
||||
func TestStatusReport_RecordGatewayClassStatus(t *testing.T) {
|
||||
report := newStatusReport()
|
||||
|
||||
accepted := gatev1.GatewayClassStatus{
|
||||
Conditions: []metav1.Condition{{Type: string(gatev1.GatewayClassConditionStatusAccepted)}},
|
||||
}
|
||||
report.RecordGatewayClassStatus("traefik", accepted)
|
||||
assert.Equal(t, accepted, report.gatewayClasses["traefik"])
|
||||
|
||||
// A later record for the same GatewayClass overwrites the previous one.
|
||||
unsupported := gatev1.GatewayClassStatus{
|
||||
Conditions: []metav1.Condition{{Type: string(gatev1.GatewayClassReasonUnsupportedVersion)}},
|
||||
}
|
||||
report.RecordGatewayClassStatus("traefik", unsupported)
|
||||
assert.Equal(t, unsupported, report.gatewayClasses["traefik"])
|
||||
}
|
||||
|
||||
func TestStatusReport_RecordGatewayStatus(t *testing.T) {
|
||||
report := newStatusReport()
|
||||
gateway := ktypes.NamespacedName{Namespace: "default", Name: "my-gateway"}
|
||||
|
||||
accepted := gatev1.GatewayStatus{
|
||||
Conditions: []metav1.Condition{{Type: string(gatev1.GatewayConditionAccepted)}},
|
||||
}
|
||||
report.RecordGatewayStatus(gateway, accepted)
|
||||
assert.Equal(t, accepted, report.gateways[gateway])
|
||||
|
||||
// A later record for the same Gateway overwrites the previous one.
|
||||
programmed := gatev1.GatewayStatus{
|
||||
Conditions: []metav1.Condition{{Type: string(gatev1.GatewayConditionProgrammed)}},
|
||||
}
|
||||
report.RecordGatewayStatus(gateway, programmed)
|
||||
assert.Equal(t, programmed, report.gateways[gateway])
|
||||
}
|
||||
|
||||
func TestStatusReport_RecordHTTPRouteStatus(t *testing.T) {
|
||||
report := newStatusReport()
|
||||
route := ktypes.NamespacedName{Namespace: "default", Name: "my-route"}
|
||||
|
||||
gatewayParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "gateway"}}
|
||||
otherParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "other-gateway"}}
|
||||
|
||||
report.RecordHTTPRouteStatus(route, gatewayParent)
|
||||
report.RecordHTTPRouteStatus(route, otherParent)
|
||||
|
||||
// Each parentRef accumulates as a distinct parent status.
|
||||
assert.Equal(t, []gatev1.RouteParentStatus{gatewayParent, otherParent}, report.httpRoutes[route].Parents)
|
||||
}
|
||||
|
||||
func TestStatusReport_RecordGRPCRouteStatus(t *testing.T) {
|
||||
report := newStatusReport()
|
||||
route := ktypes.NamespacedName{Namespace: "default", Name: "my-route"}
|
||||
|
||||
gatewayParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "gateway"}}
|
||||
otherParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "other-gateway"}}
|
||||
|
||||
report.RecordGRPCRouteStatus(route, gatewayParent)
|
||||
report.RecordGRPCRouteStatus(route, otherParent)
|
||||
|
||||
assert.Equal(t, []gatev1.RouteParentStatus{gatewayParent, otherParent}, report.grpcRoutes[route].Parents)
|
||||
}
|
||||
|
||||
func TestStatusReport_RecordTCPRouteStatus(t *testing.T) {
|
||||
report := newStatusReport()
|
||||
route := ktypes.NamespacedName{Namespace: "default", Name: "my-route"}
|
||||
|
||||
gatewayParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "gateway"}}
|
||||
otherParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "other-gateway"}}
|
||||
|
||||
report.RecordTCPRouteStatus(route, gatewayParent)
|
||||
report.RecordTCPRouteStatus(route, otherParent)
|
||||
|
||||
assert.Equal(t, []gatev1.RouteParentStatus{gatewayParent, otherParent}, report.tcpRoutes[route].Parents)
|
||||
}
|
||||
|
||||
func TestStatusReport_RecordTLSRouteStatus(t *testing.T) {
|
||||
report := newStatusReport()
|
||||
route := ktypes.NamespacedName{Namespace: "default", Name: "my-route"}
|
||||
|
||||
gatewayParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "gateway"}}
|
||||
otherParent := gatev1.RouteParentStatus{ParentRef: gatev1.ParentReference{Name: "other-gateway"}}
|
||||
|
||||
report.RecordTLSRouteStatus(route, gatewayParent)
|
||||
report.RecordTLSRouteStatus(route, otherParent)
|
||||
|
||||
assert.Equal(t, []gatev1.RouteParentStatus{gatewayParent, otherParent}, report.tlsRoutes[route].Parents)
|
||||
}
|
||||
|
||||
func TestStatusReport_RecordBackendTLSPolicyStatus(t *testing.T) {
|
||||
gatewayAncestor := gatev1.PolicyAncestorStatus{
|
||||
AncestorRef: gatev1.ParentReference{Name: "gateway"},
|
||||
ControllerName: controllerName,
|
||||
}
|
||||
otherAncestor := gatev1.PolicyAncestorStatus{
|
||||
AncestorRef: gatev1.ParentReference{Name: "other-gateway"},
|
||||
ControllerName: controllerName,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
records []gatev1.PolicyAncestorStatus
|
||||
expected []gatev1.PolicyAncestorStatus
|
||||
}{
|
||||
{
|
||||
desc: "distinct ancestor refs accumulate",
|
||||
records: []gatev1.PolicyAncestorStatus{gatewayAncestor, otherAncestor},
|
||||
expected: []gatev1.PolicyAncestorStatus{gatewayAncestor, otherAncestor},
|
||||
},
|
||||
{
|
||||
desc: "same ancestor ref is replaced, not duplicated",
|
||||
records: []gatev1.PolicyAncestorStatus{
|
||||
gatewayAncestor,
|
||||
{
|
||||
AncestorRef: gatev1.ParentReference{Name: "gateway"},
|
||||
ControllerName: "another.io/controller",
|
||||
},
|
||||
},
|
||||
expected: []gatev1.PolicyAncestorStatus{
|
||||
{
|
||||
AncestorRef: gatev1.ParentReference{Name: "gateway"},
|
||||
ControllerName: "another.io/controller",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
report := newStatusReport()
|
||||
policy := ktypes.NamespacedName{Namespace: "default", Name: "my-policy"}
|
||||
|
||||
for _, record := range test.records {
|
||||
report.RecordBackendTLSPolicyStatus(policy, record)
|
||||
}
|
||||
|
||||
assert.Equal(t, test.expected, report.backendTLSPolicies[policy].Ancestors)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -19,7 +19,7 @@ import (
|
|||
gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
|
||||
)
|
||||
|
||||
func (p *Provider) loadTCPRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration) {
|
||||
func (p *Provider) loadTCPRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration, statusReport *statusReport) {
|
||||
logger := log.Ctx(ctx)
|
||||
routes, err := p.client.ListTCPRoutes()
|
||||
if err != nil {
|
||||
|
|
@ -28,19 +28,13 @@ func (p *Provider) loadTCPRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
}
|
||||
|
||||
for _, route := range routes {
|
||||
logger := log.Ctx(ctx).With().
|
||||
Str("tcp_route", route.Name).
|
||||
Str("namespace", route.Namespace).
|
||||
Logger()
|
||||
|
||||
routeListeners := matchingGatewayListeners(gatewayListeners, route.Namespace, route.Spec.ParentRefs)
|
||||
if len(routeListeners) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var parentStatuses []gatev1alpha2.RouteParentStatus
|
||||
for _, parentRef := range route.Spec.ParentRefs {
|
||||
parentStatus := &gatev1alpha2.RouteParentStatus{
|
||||
parentStatus := gatev1alpha2.RouteParentStatus{
|
||||
ParentRef: parentRef,
|
||||
ControllerName: controllerName,
|
||||
Conditions: []metav1.Condition{
|
||||
|
|
@ -77,18 +71,7 @@ func (p *Provider) loadTCPRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
parentStatus.Conditions = upsertRouteConditionResolvedRefs(parentStatus.Conditions, resolveRefCondition)
|
||||
}
|
||||
|
||||
parentStatuses = append(parentStatuses, *parentStatus)
|
||||
}
|
||||
|
||||
routeStatus := gatev1alpha2.TCPRouteStatus{
|
||||
RouteStatus: gatev1alpha2.RouteStatus{
|
||||
Parents: parentStatuses,
|
||||
},
|
||||
}
|
||||
if err := p.client.UpdateTCPRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, routeStatus); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update TCPRoute status")
|
||||
statusReport.RecordTCPRouteStatus(ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, parentStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import (
|
|||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
)
|
||||
|
||||
func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration) {
|
||||
func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gatewayListener, conf *dynamic.Configuration, statusReport *statusReport) {
|
||||
logger := log.Ctx(ctx)
|
||||
routes, err := p.client.ListTLSRoutes()
|
||||
if err != nil {
|
||||
|
|
@ -29,18 +29,13 @@ func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
}
|
||||
|
||||
for _, route := range routes {
|
||||
logger := log.Ctx(ctx).With().
|
||||
Str("tls_route", route.Name).
|
||||
Str("namespace", route.Namespace).Logger()
|
||||
|
||||
routeListeners := matchingGatewayListeners(gatewayListeners, route.Namespace, route.Spec.ParentRefs)
|
||||
if len(routeListeners) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var parentStatuses []gatev1.RouteParentStatus
|
||||
for _, parentRef := range route.Spec.ParentRefs {
|
||||
parentStatus := &gatev1.RouteParentStatus{
|
||||
parentStatus := gatev1.RouteParentStatus{
|
||||
ParentRef: parentRef,
|
||||
ControllerName: controllerName,
|
||||
Conditions: []metav1.Condition{
|
||||
|
|
@ -75,14 +70,14 @@ func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
}
|
||||
}
|
||||
|
||||
routeConf, resolveRefCondition := p.loadTLSRoute(ctx, listener, route, hostnames)
|
||||
routeConf, resolveRefCondition := p.loadTLSRoute(listener, route, hostnames, statusReport)
|
||||
if accepted && listener.Attached {
|
||||
mergeTCPConfiguration(routeConf, conf)
|
||||
}
|
||||
parentStatus.Conditions = upsertRouteConditionResolvedRefs(parentStatus.Conditions, resolveRefCondition)
|
||||
}
|
||||
|
||||
parentStatuses = append(parentStatuses, *parentStatus)
|
||||
statusReport.RecordTLSRouteStatus(ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, parentStatus)
|
||||
}
|
||||
|
||||
// When there is at least one TLS listener, we add a default deny-all route to avoid accepting traffic for undefined hosts.
|
||||
|
|
@ -98,21 +93,10 @@ func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
LoadBalancer: &dynamic.TCPServersLoadBalancer{},
|
||||
}
|
||||
}
|
||||
|
||||
routeStatus := gatev1.TLSRouteStatus{
|
||||
RouteStatus: gatev1.RouteStatus{
|
||||
Parents: parentStatuses,
|
||||
},
|
||||
}
|
||||
if err := p.client.UpdateTLSRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, routeStatus); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update TLSRoute status")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) loadTLSRoute(ctx context.Context, listener gatewayListener, route *gatev1.TLSRoute, hostnames []gatev1.Hostname) (*dynamic.Configuration, metav1.Condition) {
|
||||
func (p *Provider) loadTLSRoute(listener gatewayListener, route *gatev1.TLSRoute, hostnames []gatev1.Hostname, statusReport *statusReport) (*dynamic.Configuration, metav1.Condition) {
|
||||
conf := &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: make(map[string]*dynamic.TCPRouter),
|
||||
|
|
@ -173,7 +157,7 @@ func (p *Provider) loadTLSRoute(ctx context.Context, listener gatewayListener, r
|
|||
}
|
||||
|
||||
var serviceCondition *metav1.Condition
|
||||
router.Service, serviceCondition = p.loadTLSWRRService(ctx, listener, conf, routerName, routeRule.BackendRefs, route)
|
||||
router.Service, serviceCondition = p.loadTLSWRRService(listener, conf, routerName, routeRule.BackendRefs, route, statusReport)
|
||||
if serviceCondition != nil {
|
||||
condition = *serviceCondition
|
||||
}
|
||||
|
|
@ -185,7 +169,7 @@ func (p *Provider) loadTLSRoute(ctx context.Context, listener gatewayListener, r
|
|||
}
|
||||
|
||||
// loadTLSWRRService is generating a WRR service, even when there is only one target.
|
||||
func (p *Provider) loadTLSWRRService(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, routeKey string, backendRefs []gatev1.BackendRef, route *gatev1.TLSRoute) (string, *metav1.Condition) {
|
||||
func (p *Provider) loadTLSWRRService(listener gatewayListener, conf *dynamic.Configuration, routeKey string, backendRefs []gatev1.BackendRef, route *gatev1.TLSRoute, statusReport *statusReport) (string, *metav1.Condition) {
|
||||
name := routeKey + "-wrr"
|
||||
if _, ok := conf.TCP.Services[name]; ok {
|
||||
return name, nil
|
||||
|
|
@ -194,7 +178,7 @@ func (p *Provider) loadTLSWRRService(ctx context.Context, listener gatewayListen
|
|||
var wrr dynamic.TCPWeightedRoundRobin
|
||||
var condition *metav1.Condition
|
||||
for _, backendRef := range backendRefs {
|
||||
svcName, svc, errCondition := p.loadTLSService(ctx, listener, conf, route, backendRef)
|
||||
svcName, svc, errCondition := p.loadTLSService(listener, conf, route, backendRef, statusReport)
|
||||
weight := ptr.To(int(ptr.Deref(backendRef.Weight, 1)))
|
||||
|
||||
if errCondition != nil {
|
||||
|
|
@ -228,7 +212,7 @@ func (p *Provider) loadTLSWRRService(ctx context.Context, listener gatewayListen
|
|||
return name, condition
|
||||
}
|
||||
|
||||
func (p *Provider) loadTLSService(ctx context.Context, listener gatewayListener, conf *dynamic.Configuration, route *gatev1.TLSRoute, backendRef gatev1.BackendRef) (string, *dynamic.TCPService, *metav1.Condition) {
|
||||
func (p *Provider) loadTLSService(listener gatewayListener, conf *dynamic.Configuration, route *gatev1.TLSRoute, backendRef gatev1.BackendRef, statusReport *statusReport) (string, *dynamic.TCPService, *metav1.Condition) {
|
||||
kind := ptr.Deref(backendRef.Kind, kindService)
|
||||
|
||||
group := groupCore
|
||||
|
|
@ -296,7 +280,7 @@ func (p *Provider) loadTLSService(ctx context.Context, listener gatewayListener,
|
|||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, st, errCondition := p.loadTLSServers(ctx, namespace, route, backendRef, listener)
|
||||
lb, st, errCondition := p.loadTLSServers(namespace, route, backendRef, listener, statusReport)
|
||||
if errCondition != nil {
|
||||
return serviceName, nil, errCondition
|
||||
}
|
||||
|
|
@ -309,7 +293,7 @@ func (p *Provider) loadTLSService(ctx context.Context, listener gatewayListener,
|
|||
return serviceName, &dynamic.TCPService{LoadBalancer: lb}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadTLSServers(ctx context.Context, namespace string, route *gatev1.TLSRoute, backendRef gatev1.BackendRef, listener gatewayListener) (*dynamic.TCPServersLoadBalancer, *dynamic.TCPServersTransport, *metav1.Condition) {
|
||||
func (p *Provider) loadTLSServers(namespace string, route *gatev1.TLSRoute, backendRef gatev1.BackendRef, listener gatewayListener, statusReport *statusReport) (*dynamic.TCPServersLoadBalancer, *dynamic.TCPServersTransport, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef)
|
||||
if err != nil {
|
||||
return nil, nil, &metav1.Condition{
|
||||
|
|
@ -386,12 +370,7 @@ func (p *Provider) loadTLSServers(ctx context.Context, namespace string, route *
|
|||
},
|
||||
)
|
||||
|
||||
status := gatev1.PolicyStatus{
|
||||
Ancestors: []gatev1.PolicyAncestorStatus{policyAncestorStatus},
|
||||
}
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).Msg("Unable to update conflicting BackendTLSPolicy status")
|
||||
}
|
||||
statusReport.RecordBackendTLSPolicyStatus(ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policyAncestorStatus)
|
||||
|
||||
continue
|
||||
}
|
||||
|
|
@ -418,12 +397,7 @@ func (p *Provider) loadTLSServers(ctx context.Context, namespace string, route *
|
|||
})
|
||||
}
|
||||
|
||||
status := gatev1.PolicyStatus{
|
||||
Ancestors: []gatev1.PolicyAncestorStatus{policyAncestorStatus},
|
||||
}
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).Msg("Unable to update BackendTLSPolicy status")
|
||||
}
|
||||
statusReport.RecordBackendTLSPolicyStatus(ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policyAncestorStatus)
|
||||
|
||||
// When something went wrong during the loading of a ServersTransport,
|
||||
// we stop here and return a route condition error.
|
||||
|
|
|
|||
Loading…
Reference in a new issue